How to use ptls.data_load.datasets datasets
Here are the datasets (torch.utils.data.Dataset) which assure interface to the data.
For data prepared in memory use:
MemoryMapDatasetwithi_filtersAugmentationDatasetwithf_augmentationsif needed- endpoint map dataset from
ptls.frames
For small (map mode) parquet data use:
ParquetDatasetwithi_filtersPersistDatasetAugmentationDatasetwithf_augmentationsif needed- endpoint map dataset from
ptls.frames
For large (iterable mode) parquet data use:
ParquetDatasetwithi_filtersAugmentationDatasetwithf_augmentationsif needed- endpoint iterable dataset from
ptls.frames
Other dataset order and combination are possible but not tested.
Simple example
Dict features in a list is a simple example of data.
Python's list have the same interface as torch.Dataset, so you can just provide it to dataloader.
import torch
from ptls.data_load.utils import collate_feature_dict
data_list = [
{
'mcc': torch.arange(seq_len),
'id': f'user_{i}'
}
for i, seq_len in enumerate([4, 3, 6])
]
dl = torch.utils.data.DataLoader(
dataset=data_list,
collate_fn=collate_feature_dict,
batch_size=2,
)
for batch in dl:
print(batch.payload, batch.seq_lens, sep='\n')
In this example we use simple list as dataset.
Sometimes you need to make changes in the dataset. We propose a filter approach for this.
map and iterable
There are the types of torch datasets.
More info about dataset types to understand map and iterable.
Dataloader choose a way of iteration based on type his dataset.
In out pipeline Dataloader works with endpoint dataset from ptls.frames.
So the type of endpoint dataset from ptls.frames choose a way of iteration.
Map dataset provide better shuffle. Iterable dataset requires less memory.
Warning for multiprocessing dataloader
Each worker use the same source data.
Map dataloader knows dataset
lenand usessamplerto randomly split all indexes fromrange(o, len)between workers. So each worker use his own part of data.Iterable dataloader can just iterate over the source data. In default case each worker iterate the same data and output are multiplied by worker count.
To avoid this iterable datasets should implement a way to split a data between workers.
Multiprocessing split implementation:
ParquetDatasetimplement split it and works correcti_filtersandf_augmentationsdon't contain a data and works correct- Iterable endpoint datasets works correct with iterable source
- Iterable endpoint datasets multiply data with map source
PersistDatasetiterate input during initialisation. Usually this happens out of dataloader in single main process. So it works correct.
i_filters and f_augmentations
i_filters- iterable filtersf_augmentations- augmentation functions
Filters
ptls propose filters for dataset transformation. All of them are in ptls.data_load.iterable_processing.
These filter implemented in generator-style. Call filter object to get generator with modified records.
from ptls.data_load.iterable_processing import SeqLenFilter
i_filter = SeqLenFilter(min_seq_len=4)
for rec in i_filter(data_list):
print(rec)
There were 3 examples in the list, it became 2 cause SeqLenFilter drop short sequence.
Many kinds of filters possible: dropping records, multiply records, records transformation.
i_filters can be chained. Datasets provide a convenient way to do it.
Many datasets in ptls.data_load.datasets support i_filters.
They takes i_filters as list of iterable_processing objects.
Augmentations
Sometimes we have to change an items from train data. This is augmentations.
They are in ptls.data_load.augmentations.
Example:
from ptls.data_load.augmentations import RandomSlice
f_augmentation = RandomSlice(min_len=4, max_len=10)
for rec in data_list:
new_rec = f_augmentation(rec)
print(new_rec)
Here RandomSlice augmentation take a random slice from source record.
Compare
i_filter |
f_augmentation |
|---|---|
| May change record. Result is always the same | May change record. Result is random |
| Place it be before persist stage to run it once and save total cpu resource | Don't place it before persist stage because it kills the random |
| Can delete items | Can not delete items |
| Can yield new items | Can not create new items |
| Works a generator and requires iterable processing | Works as a function can be both map or iterable |
In memory data
In memory data is common case. Data can a list or generator with feature dicts.
import torch
import random
data_list = [
{
'mcc': torch.arange(seq_len),
'id': f'user_{i}'
}
for i, seq_len in enumerate([4, 3, 6])
]
def data_gen(n):
for i in range(n):
seq_len = random.randint(4, 8)
yield {
'mcc': torch.arange(seq_len),
'id': f'user_{i}'
}
ptls.data_load.datasets.MemoryMapDataset:
- implements
mapdataset - iterates over the data and stores it in an internal list
- looks like a list
ptls.data_load.datasets.MemoryIterableDataset:
- implements
iterabledataset - just iterates over the data
- looks like a generator
Warning
Currently
MemoryIterableDatasetdon`t support initial data split between workers. We don't recommend use it without modification.
Both datasets support any kind of input: list or generator. As all datasets supports tha same format (list or generator) as input and output they can be chained. This make sense for some cases.
Data pipelines:
listinput withMemoryMapDataset- dataset keep modified withi_filtersdata. Original data is unchanged.i_filtersapplied once for each record. This assures fast item access but slow start. You should wait until all data are passed throughi_filters.generatorinput withMemoryMapDataset- dataset iterate over generator and keep the result in memory. More memory are used, but faster access is possible.i_filtersapplied once for each record. Freezes items taken from generator if it uses some random during generation.listwithMemoryIterableDataset- take more times for data access causei_filtersapplied during each record access (for each epoch). Faster start, you don't wait until all data are passed throughi_filters.generatorinput withMemoryIterableDataset- generator output modified withi_filtersdata. Less memory used. Infinite dataset is possible.
Example:
import torch
from ptls.data_load.datasets import MemoryMapDataset
from ptls.data_load.utils import collate_feature_dict
from ptls.data_load.iterable_processing import SeqLenFilter, FeatureRename
data_list = [
{
'mcc': torch.arange(seq_len),
'id': f'user_{i}'
}
for i, seq_len in enumerate([4, 3, 6, 2, 8, 3, 5, 4])
]
dataset = MemoryMapDataset(
data=data_list,
i_filters=[
SeqLenFilter(min_seq_len=4),
FeatureRename({'id': 'user_id'}),
]
)
dl = torch.utils.data.DataLoader(
dataset=dataset,
collate_fn=collate_feature_dict,
batch_size=10,
)
for batch in dl:
print(batch.payload, batch.seq_lens, sep='\n')
Parquet file read
For large amount of data pyspark is possible engine to prepare data and convert it in feature dict format.
See demo/pyspark-parquet.ipynb with example of data preprocessing with pyspark and parquet file preparation.
ptls.data_load.datasets.ParquetDataset is a dataset which reads parquet files with feature dicts.
ptls.data_load.datasets.ParquetDataset:
- implements
iterabledataset - works correct with multiprocessing dataloader
- looks like a generator
- supports
i_filters
You can feed ParquetDataset directly fo dataloader for iterable way of usage.
Cou can combine ParquetDataset with MemoryMapDataset to map way of usage.
ParquetDataset requires parquet file names. Usually spark saves many parquet files for one dataset,
depending on the number of partitions.
You can get all file names with ptls.data_load.datasets.ParquetFiles or ptls.data_load.datasets.parquet_file_scan.
Many files for one dataset allows you to:
- control amount of data by reading more or less files
- split data on train, valid, test
Persist dataset
ptls.data_load.datasets.PersistDataset store items from source dataset to the memory.
If you source data is iterator (like python generator or ParquetDataset)
all i_filters will be called each time when you access the data.
Persist the data into memory and i_filters will be called once.
Much memory may be used to store all dataset items.
Data access is faster.
Persisted iterator have len and can be randomly accessed by index.
Augmentations
Class ptls.data_load.datasets.AugmentationDataset is a way to apply augmentations.
Example:
from ptls.data_load.datasets import AugmentationDataset, PersistDataset, ParquetDataset
from ptls.data_load.augmentations import AllTimeShuffle, DropoutTrx
train_data = AugmentationDataset(
f_augmentations=[
AllTimeShuffle(),
DropoutTrx(trx_dropout=0.01),
],
data=PersistDataset(
data=ParquetDataset(...),
),
)
Here we are using iterable ParquetDataset as the source, loading it into memory using PersistDataset.
Then, each time we access the data, we apply two augmentation functions to the items stored in the PersistDataset.
AugmentationDataset also works in iterable mode. Previous example will be like this:
train_data = AugmentationDataset(
f_augmentations=[
AllTimeShuffle(),
DropoutTrx(trx_dropout=0.01),
],
data=ParquetDataset(...),
)
Classes and functions
See docstrings for classes:
ptls.data_load.datasets.MemoryMapDatasetptls.data_load.datasets.MemoryIterableDatasetptls.data_load.datasets.ParquetFilesptls.data_load.datasets.ParquetDatasetptls.data_load.datasets.PersistDataset
See docstrings for functions:
ptls.data_load.datasets.parquet_file_scan