Spark Dataframe to MDS#
In this tutorial, we will demonstrate how to use the streaming spark-converter to convert a spark dataframe to create a StreamingDataset. The users have the option to pass in a preprocessing job such as a tokenizer to the converter, which can be useful if materializing the intermediate dataframe is time consuming or taking extra development.
Tutorial Covers#
Installation of libraries
[Basic use-case] Convert spark dataframe to MDS format
[Advanced use-case] convert spark dataframe into tokenized format and convert to MDS format.
Setup#
Letâs start by making sure the right packages are installed and imported. We need to install the mosaicml-streaming
package which installs the sufficient dependencies to run this tutorial.
[ ]:
%pip install --upgrade fsspec datasets transformers
[ ]:
%pip uninstall -y mosaicml-streaming
[ ]:
%pip install pyspark==3.4.1
[ ]:
import os
import shutil
from typing import Any, Sequence, Dict, Iterable, Optional
from pyspark.sql import SparkSession
import pandas as pd
import numpy as np
from tempfile import mkdtemp
import datasets as hf_datasets
from transformers import AutoTokenizer, PreTrainedTokenizerBase
Weâll be using Streamingâs dataframeToMDS()
method which converts the dataframe into the streaming MDS format.
[ ]:
from streaming.base.converters import dataframeToMDS
[Basic use-case] Convert spark dataframe to MDS format#
Steps: 1. Create a Synthetic NLP dataset. 2. Store the above dataset as a parquet file. 3. Load the parquet file as spark dataframe. 4. Convert the spark dataframe to MDS format. 5. Load the MDS dataset and look at the output.
Create a Synthetic NLP dataset#
In this tutorial, we will be creating a synthetic number-saying dataset, i.e. converting a numbers from digits to words, for example, number 123
would spell as one hundred twenty three
. The numbers are generated sequentially with a random positive/negative prefix sign.
Letâs import a utility functions to generate those synthetic number-saying dataset.
[ ]:
class NumberAndSayDataset:
"""Generate a synthetic number-saying dataset.
Converting a numbers from digits to words, for example, number 123 would spell as
`one hundred twenty three`. The numbers are generated randomly and it supports a number
up-to positive/negative approximately 99 Millions.
Args:
num_samples (int): number of samples. Defaults to 100.
column_names list[str]: A list of features' and target name. Defaults to ['number',
'words'].
seed (int): seed value for deterministic randomness.
"""
ones = (
'zero one two three four five six seven eight nine ten eleven twelve thirteen fourteen ' +
'fifteen sixteen seventeen eighteen nineteen').split()
tens = 'twenty thirty forty fifty sixty seventy eighty ninety'.split()
def __init__(self,
num_samples: int = 100,
column_names: list[str] = ['number', 'words'],
seed: int = 987) -> None:
self.num_samples = num_samples
self.column_encodings = ['int', 'str']
self.column_sizes = [8, None]
self.column_names = column_names
self._index = 0
self.seed = seed
def __len__(self) -> int:
return self.num_samples
def _say(self, i: int) -> list[str]:
if i < 0:
return ['negative'] + self._say(-i)
elif i <= 19:
return [self.ones[i]]
elif i < 100:
return [self.tens[i // 10 - 2]] + ([self.ones[i % 10]] if i % 10 else [])
elif i < 1_000:
return [self.ones[i // 100], 'hundred'] + (self._say(i % 100) if i % 100 else [])
elif i < 1_000_000:
return self._say(i // 1_000) + ['thousand'
] + (self._say(i % 1_000) if i % 1_000 else [])
elif i < 1_000_000_000:
return self._say(
i // 1_000_000) + ['million'] + (self._say(i % 1_000_000) if i % 1_000_000 else [])
else:
assert False
def _get_number(self) -> int:
sign = (np.random.random() < 0.8) * 2 - 1
mag = 10**np.random.uniform(1, 4) - 10
return sign * int(mag**2)
def __iter__(self):
return self
def __next__(self) -> dict[str, Any]:
if self._index >= self.num_samples:
raise StopIteration
number = self._get_number()
words = ' '.join(self._say(number))
self._index += 1
return {
self.column_names[0]: number,
self.column_names[1]: words,
}
@property
def seed(self) -> int:
return self._seed
@seed.setter
def seed(self, value: int) -> None:
self._seed = value # pyright: ignore
np.random.seed(self._seed)
Store the dataset as a parquet file#
[ ]:
# Create a temporary directory
local_dir = mkdtemp()
syn_dataset = NumberAndSayDataset()
df = pd.DataFrame.from_dict([record for record in syn_dataset])
df.to_parquet(os.path.join(local_dir, 'synthetic_dataset.parquet'))
Load the parquet file as spark dataframe#
[ ]:
spark = SparkSession.builder.getOrCreate()
pdf = spark.read.parquet(os.path.join(local_dir, 'synthetic_dataset.parquet'))
Take a peek at the spark dataframe
[ ]:
pdf.show(5, truncate=False)
Convert the spark dataframe to MDS format#
[ ]:
# Empty the MDS output directory
out_path = os.path.join(local_dir, 'mds')
shutil.rmtree(out_path, ignore_errors=True)
# To Xiaohan: Can we skip passing mds_kwargs for basic use-case and let it infer by dataframeToMDS method ?
mds_kwargs = {'out': out_path, 'columns': {'number': 'int64', 'words':'str'}}
# Convert the dataset to an MDS format. It divides the dataframe into 4 parts, one parts per worker and merge the `index.json` from 4 sub-parts into one in a parent directory.
dataframeToMDS(pdf.repartition(4), merge_index=True, mds_kwargs=mds_kwargs)
Letâs check file structures in the output MDS dataset. One can see four directories and one index.json
file. The index.json
file contains the meta-data information about all four sub-directories.
[ ]:
%ls {out_path}
Load the MDS dataset using StreamingDataset and look at the output.#
[ ]:
from torch.utils.data import DataLoader
import streaming
from streaming import StreamingDataset
# clean stale shared memory if any
streaming.base.util.clean_stale_shared_memory()
dataset = StreamingDataset(local=out_path, remote=None, batch_size=2, predownload=4)
dataloader = DataLoader(dataset, batch_size=2, num_workers=1)
for i, data in enumerate(dataloader):
print(data)
# Display only first 10 batches
if i == 10:
break
[Advanced use-case] convert spark dataframe into tokenized format and convert to MDS format#
Steps: 1. [Same as above] Create a Synthetic NLP dataset. 2. [Same as above] Store the above dataset as a parquet file. 3. [Same as above] Load the parquet file as spark dataframe. 4. Create a user defined function which modifies the dataframe 4. Convert the modified data into MDS format. 5. Load the MDS dataset and look at the output.
Create a user defined function which modifies the dataframe#
The user defined function should be an iterable function and it must yield an output as a dictionary with key
as the column name and value
as the output of that column. For example, in this tutorial, the key
is tokens
and value
is the tokenized output in bytes. If an iterable function is defined, the user takes the full responsibility of providing the correct columns
argument, in the case below, it should be
columns={âtokensâ: âbytesâ}
where tokens
is the key created by the udf_iterator, and bytes
represents the format of the field so that MDS chooses the proper encoding method.
Take a peek at the spark dataframe
[ ]:
pdf.show(5, truncate=False)
Convert the spark dataframe to MDS format#
This time we supply the user defined iterable function and the associated function arguments. For the purpose of demonstration, the user defined tokenization function pandas_processing_fn
is largely simplified. For practical applications, the users may want to have more involved preprocessing steps. For concatenation dataset and more process examples, users are referred to Mosaicâs LLM Foundry.
[ ]:
import os
import warnings
from typing import Dict, Iterable, Union
import datasets as hf_datasets
import pandas as pd
import numpy as np
from torch.utils.data import IterableDataset
from transformers import PreTrainedTokenizerBase
def pandas_processing_fn(df: pd.DataFrame, **args) -> Iterable[Dict[str, bytes]]:
"""
Parameters:
-----------
df : pandas.DataFrame
The input pandas DataFrame that needs to be processed.
**args : keyword arguments
Additional arguments to be passed to the 'process_some_data' function during processing.
Returns:
--------
iterable obj
"""
hf_dataset = hf_datasets.Dataset.from_pandas(df=df, split=args['split'])
tokenizer = AutoTokenizer.from_pretrained(args['tokenizer'])
# we will enforce length, so suppress warnings about sequences too long for the model
tokenizer.model_max_length = int(1e30)
max_length = args['concat_tokens']
for sample in hf_dataset:
buffer = []
for sample in hf_dataset:
encoded = tokenizer(sample['words'],
truncation=False,
padding=False)
iids = encoded['input_ids']
buffer = buffer + iids
while len(buffer) >= max_length:
concat_sample = buffer[:max_length]
buffer = []
yield {
# convert to bytes to store in MDS binary format
'tokens': np.asarray(concat_sample).tobytes()
}
[ ]:
# Empty the MDS output directory
out_path = os.path.join(local_dir, 'mds')
shutil.rmtree(out_path, ignore_errors=True)
# Provide a MDS keyword args. Ensure `columns` field maps the output from iterable function (Tokenizer in this example)
mds_kwargs = {'out': out_path, 'columns': {'tokens': 'bytes'}}
# Tokenizer arguments
udf_kwargs = {
'concat_tokens': 4,
'tokenizer': 'EleutherAI/gpt-neox-20b',
'eos_text': '<|endoftext|>',
'compression': 'zstd',
'split': 'train',
'no_wrap': False,
'bos_text': '',
}
# Convert the dataset to an MDS format. It fetches sample from dataframe, tokenize it, and then convert to MDS format.
# It divides the dataframe into 4 parts, one parts per worker and merge the `index.json` from 4 sub-parts into one in a parent directory.
dataframeToMDS(pdf.repartition(4), merge_index=True, mds_kwargs=mds_kwargs, udf_iterable=pandas_processing_fn, udf_kwargs=udf_kwargs)
Letâs check file structures in the output MDS dataset. One can see four directories and one index.json
file. The index.json
file contains the meta-data information about all four sub-directories.
[ ]:
%ls {out_path}
[ ]:
%cat {out_path +'/index.json'}
Load the MDS dataset using StreamingDataset and look at the output.#
[ ]:
from torch.utils.data import DataLoader
import streaming
from streaming import StreamingDataset
# clean stale shared memory if any
streaming.base.util.clean_stale_shared_memory()
dataset = StreamingDataset(local=out_path, remote=None, batch_size=2, predownload=4)
dataloader = DataLoader(dataset, batch_size=2, num_workers=1)
for i, data in enumerate(dataloader):
print(data)
# Display only first 10 batches
if i == 10:
break
Cleanup#
Thatâs it. No need to hang on to the files created by the tutorialâŚ
[ ]:
shutil.rmtree(out_path, ignore_errors=True)
shutil.rmtree(local_dir, ignore_errors=True)
What next?#
Youâve now seen an in-depth look at how to convert spark dataframe to MDS format and load the same MDS dataset for model training or for your personalized use-case.
To continue learning about Streaming, please continue to explore our examples!
Come get involved with MosaicML!#
Weâd love for you to get involved with the MosaicML community in any of these ways:
Star Streaming on GitHub#
Help make others aware of our work by starring Streaming on GitHub.
Join the MosaicML Slack#
Head on over to the MosaicML slack to join other ML efficiency enthusiasts. Come for the paper discussions, stay for the memes!
Contribute to Streaming#
Is there a bug you noticed or a feature youâd like? File an issue or make a pull request!