在處理監督機器學習任務時,最重要的東西是數據——而且是大量的數據。當面對少量數據時,特別是需要深度神經網絡的任務時,該怎么辦?如何創建一個快速高效的數據管道來生成更多的數據,從而在不花費數百美元在昂貴的云GPU單元上的情況下進行深度神經網絡的訓練?
這是我們在MAFAT雷達分類競賽中遇到的一些問題。我的隊友hezi hershkovitz為生成更多訓練數據而進行的增強,以及我們首次嘗試使用數據加載器在飛行中生成這些數據。
要解決的問題
我們在比賽中使用數據管道也遇到了一些問題,主要涉及速度和效率:
它沒有利用Numpy和Pandas在Python中提供的快速矢量化操作的優勢
每個批次所需的信息都首先編寫并存儲為字典,然后使用Python for循環在getitem方法中進行訪問,從而導致迭代和處理速度緩慢。
從音軌生成"移位的"片段會導致每次檢索新片段時都重新構建相同的音軌,這也會減緩管道的速度。
管道無法處理2D或3D輸入,因為我們同時使用了scalograms和spectrograms但是無法處理。
如果我們簡單地按照批處理的方式進行所有的移位和翻轉,那么批處理中就會充斥著與其他示例過于相似的示例,從而使模型不能很好地泛化。
這些低效率的核心原因是,管道是以分段作為基本單元運行,而不是在音軌上運行。
數據格式概述
在制作我們的流數據之前,先再次介紹一下數據集,MAFAT數據由多普勒雷達信號的固定長度段組成,表示為128x32 I / Q矩陣; 但是,在數據集中,有許多段屬于同一磁道,即,雷達信號持續時間較長,一條磁道中有1到43個段。
上面的圖像來自hezi hershkovitz 的文章,并顯示了一個完整的跟蹤訓練數據集時,結合所有的片段。紅色的矩形是包含在這條軌跡中的單獨的部分。白點是"多普勒脈沖",代表被跟蹤物體的質心。
借助"多普勒脈沖"白點,我們可以很容易地看到,航跡是由相鄰的段組成的,即段id 1942之后是1943,然后是1944,等等。
片段相鄰的情況下允許我們使用移位來創建"新的"樣本。
但是,由于每個音軌由不同數量的片段組成,因此從任何給定音軌生成的增補數目都會不同,這使我們無法使用常規的Pytorch Dataset 類。 這里就需要依靠Pytorch中的IterableDataset 類從每個音軌生成數據流。
數據流管道設計
這三個對象的高級目標是創建一個_Segment對象流,它能夠足夠靈活地處理音軌和段,并且在代碼中提供一致的語義:
class _Segment(Dict, ABC):
segment_id: Union[int, str]
output_array: np.ndarray
doppler_burst: np.ndarray
target_type: np.ndarray
segment_count: int
為此,我們創建了:
一個配置類,它將為一個特定的實驗保存所有必要的超參數和環境變量——這實際上只是一個具有預定義鍵的簡單字典。
一個DataDict類,它處理原始片段的加載,驗證每一條軌跡,創建子軌跡以防止數據泄漏,并將數據轉換為正確的格式,例如2D或3D,并為擴展做好準備
StreamingDataset類,是Pytorch IterableDataset的子類,處理模型的擴充和流段。
config = Config(file_path=PATH_DATA,
num_tracks=3,
valratio=6,
get_shifts=True,
output_data_type='spectrogram',
get_horizontal_flip=True,
get_vertical_flip=True,
mother_wavelet='cgau1',
wavelet_scale=3,
batch_size=50,
tracks_in_memory=25,
include_doppler=True,
shift_segment=2)dataset = DataDict(config=config)
train_dataset = StreamingDataset(dataset.train_data, config, shuffle=True)
train_loader = DataLoader(train_dataset,batch_size=config['batch_size'])
DataDict實現
在DataDict中將片段處理為音軌,然后再處理為片段,為加速代碼提供了很好的機會,特別是在數據驗證、重新分割和軌創建都可以向量化的情況下。
我們使用了Numpy和Pandas中的一堆技巧和簡潔的特性,大量使用了布爾矩陣來進行驗證,并將scalogram/spectrogram 圖轉換應用到音軌中連接的片段上。代碼太長,但你可以去最后的源代碼地址中查看一下DataDict createtrackobjects方法。
生成細分流
一旦將數據集轉換為軌跡,下一個問題就是以更快的方式進行拆分和移動。 在這里,Numpy提供了執行快速的,基于矩陣的操作和從一條軌跡快速生成一組新的片段所需的所有工具。
def split_Nd_array(array: np.ndarray, nsplits: int) -> List[np.ndarray]:
if array.ndim == 1:
indices = range(0, len(array) - 31, nsplits)
segments = [np.take(array, np.arange(i, i + 32), axis=0).copy() for i in indices]
else:
indices = range(0, array.shape[1] - 31, nsplits)
segments = [np.take(array, np.arange(i, i + 32), axis=1).copy() for i in indices]
return segments
def create_new_segments_from_splits(segment: _Segment, nsplits: int) -> List[_Segment]:
new_segments = []
if segment['output_array'].shape[1] > 32:
output_array = split_Nd_array(array=segment['output_array'], nsplits=nsplits)
bursts = split_Nd_array(array=segment['doppler_burst'], nsplits=nsplits)
new_segments.extend([_Segment(segment_id=f'{segment["segment_id"]}_{j}',
output_array=array,
doppler_burst=bursts[j],
target_type=segment['target_type'],
segment_count=1)
for j, array in enumerate(output_array)])
else:
new_segments.Append(segment)
return new_segments
Pytorch Iterable數據集
注:
torch.utils.data.IterableDataset 是 PyTorch 1.2中新的數據集類
一旦音軌再次被分割成段,我們需要編寫一個函數,每次增加一個音軌,并將新生成的段發送到流中,從流中從多個音軌生成成批的段。最后一點對于確保每個批的數據分布合理是至關重要的。
生成流數據集正是IterableDataset類的工作。 它與Pytorch中的經典(Map)Dataset類的區別在于,對于IterableDataset,DataLoader調用next(iterable_Dataset),直到它構建了一個完整的批處理,而不是實現一個接收映射到數據集中某個項的索引的方法。
創建批次
在這個例子的基礎上,我們創建了一個實現,它的核心進程是"processtracksshuffle",以確保DataLoader提供的每個批處理都包含來自多個音軌的段的良好混合。我們通過設置tracksinmemory超參數來實現這一點,該參數允許我們調整在生成新的流之前將處理多少條音軌并將其保存到工作內存中。
def segments_generator(self, segment_list: _Segment) -> None:
"""
Generates original and augmented segments from a track.
"""
if self.config.get('get_shifts'):
segment_list = create_new_segments_from_splits(segment_list, nsplits=self.config['shift_segment'])
else:
segment_list = create_new_segments_from_splits(segment_list, nsplits=32)
if self.config.get('get_vertical_flip'):
flips = create_flipped_segments(segment_list, flip_type='vertical')
segment_list.extend(flips)
if self.config.get('get_horizontal_flip'):
flips = create_flipped_segments(segment_list, flip_type='horizontal')
segment_list.extend(flips)
for segment in segment_list:
if self.config['output_data_type'] == 'scalogram':
segment.assert_valid_scalogram()
else:
segment.assert_valid_spectrogram()
self.segment_blocks.extend(segment_list)
random.shuffle(self.segment_blocks)
def process_tracks_shuffle(self):
for i, track in enumerate(self.data):
self.segments_generator(track)
if i % self.config.get('tracks_in_memory', 100) == self.config.get('tracks_in_memory', 100):
segment_blocks = self.segment_blocks
self.segment_blocks = []
random.shuffle(segment_blocks)
yield segment_blocks
segment_blocks = self.segment_blocks
self.segment_blocks = []
random.shuffle(segment_blocks)
yield segment_blocks
def shuffle_stream(self):
return chain(self.process_tracks_shuffle())
# def linear_stream(self):
# return chain(self.segments_generator(track) for track in self.data)
def __iter__(self):
for segments in chain(self.shuffle_stream()):
yield from segments
并行化
在進一步加速數據處理方面,我們沒有利用通過在多個GPU并行化的處理來生成多個流。不過需要記住的一件事是,IterableDataset的并行化并不像標準Dataset類那樣簡單,因為僅僅用IterableDataset添加workers會導致每個worker獲得數據的底層完整副本。
結論
在Pytorch中學習使用流數據是一次很好的學習經歷,也是一次很好的編程挑戰。這里通過改變我們對pytorch傳統的dataset的組織的概念的理解,開啟一種更有效地處理數據的方式。
眾所周知,我們80%的時間都花在了數據清理和管道建立上。然而,我們不應將數據處理視為必須處理而又經常被忽略的工作,而去深入研究20%建模的"樂趣"。我們而應將管道和處理視為一個同樣具有樂趣和關鍵性的工作。因為這是必要的,因為管道速度越快,運行的實驗就越多,數據處理的越好,得到的結果就會越好。
作者:Adam Cohn
deephub翻譯組
代碼地址:
github/ShaulSolomon/sota-mafat-radar/