U-Net实现语音分离的代码分析
不是自己对着论文复现的,是网上找来自己改的
用到的包
- Python 3.5
- Chainer 3.0:一个柔性的神经网络框架,能够简单直观的写出复杂的网络。Chainer 对应地采用了一种叫做 “边定义边运行” 的机制, 即, 网络可以在实际进行前向计算的时候同时被定义。
- librosa 0.5.0:python的音频处理库
- cupy 2.0: 一个通过利用CUDA GPU库在Nvidia GPU上实现Numpy数组的库
代码
预处理:ProcessDSD.py
数据集处理,将DSD100数据集的音频文件转换为时频声谱。
DSD 包含两个文件夹,一个是混合音频的文件夹"Mixtures", 另一个是人声、鼓、贝司、其他乐器的分轨音频"Sources"。每个文件夹里包含两个子文件夹,"Dev" 是训练集,"Test"是测试集。
import numpy as np
from librosa.core import load
import util
import os
PATH_DSD_SOURCE = ["DSD100/Sources/Dev", "DSD100/Sources/Test"]
PATH_DSD_MIXTURE = ["DSD100/Mixtures/Dev", "DSD100/Mixtures/Test"]
FILE_MIX = "mixture.wav"
FILE_BASS = "bass.wav"
FILE_DRUMS = "drums.wav"
FILE_OTHER = "other.wav"
FILE_VOCAL = "vocals.wav"
list_source_dir = [os.path.join(PATH_DSD_SOURCE[0], f)
for f in os.listdir(PATH_DSD_SOURCE[0])]
list_source_dir.extend([os.path.join(PATH_DSD_SOURCE[1], f)
for f in os.listdir(PATH_DSD_SOURCE[1])])
list_source_dir = sorted(list_source_dir)
list_mix_dir = [os.path.join(PATH_DSD_MIXTURE[0], f)
for f in os.listdir(PATH_DSD_MIXTURE[0])]
list_mix_dir.extend([os.path.join(PATH_DSD_MIXTURE[1], f)
for f in os.listdir(PATH_DSD_MIXTURE[1])])
list_mix_dir = sorted(list_mix_dir)
for mix_dir, source_dir in zip(list_mix_dir, list_source_dir):
assert(mix_dir.split("/")[-1] == source_dir.split("/")[-1])
fname = mix_dir.split("/")[-1]
print("Processing: " + fname)
y_mix, sr = load(os.path.join(mix_dir, FILE_MIX), sr=None)
y_vocal, _ = load(os.path.join(source_dir, FILE_VOCAL), sr=None)
y_inst = sum([load(os.path.join(source_dir, f), sr=None)[0]
for f in [FILE_DRUMS, FILE_BASS, FILE_OTHER]])
assert(y_mix.shape == y_vocal.shape)
assert(y_mix.shape == y_inst.shape)
util.SaveSpectrogram(y_mix, y_vocal, y_inst, fname)
rand_voc = np.random.randint(100, size=50)
rand_bass = np.random.randint(100, size=50)
rand_drums = np.random.randint(100, size=50)
rand_other = np.random.randint(100, size=50)
count = 1
print("Generating random mix...")
for i_voc, i_bass, i_drums, i_other in \
zip(rand_voc, rand_bass, rand_drums, rand_other):
y_vocal, _ = load(os.path.join(list_source_dir[i_voc], FILE_VOCAL), sr=None)
y_bass, _ = load(os.path.join(list_source_dir[i_bass], FILE_BASS), sr=None)
y_drums, _ = load(os.path.join(list_source_dir[i_drums], FILE_DRUMS), sr=None)
y_other, _ = load(os.path.join(list_source_dir[i_other], FILE_OTHER), sr=None)
minsize = min([y_vocal.size, y_bass.size, y_drums.size, y_other.size])
y_vocal = y_vocal[:minsize]
y_inst = y_bass[:minsize] + y_drums[:minsize] + y_other[:minsize]
y_mix = y_vocal + y_inst
fname = "dsd_random%02d" % count
util.SaveSpectrogram(y_mix, y_vocal, y_inst, fname)
print("Saved:" + fname)
count += 1
主要用到util.SaveSpectrogram(y_mix, y_vocal, y_inst, fname)
程序总入口:DoExperiment.py
输入音频路径,训练模型或用现有模型,从原始音频获得分离的人声/分离的音频
"""
Code example for training U-Net
"""
import network
Xlist,Ylist = util.LoadDataset(target="vocal")
print("Dataset loaded.")
network.TrainUNet(Xlist,Ylist,savefile="unet.model",epoch=30)
"""
Code example for performing vocal separation with U-Net
"""
import util
fname = "Say Hello.mp3"
mag, phase = util.LoadAudio(fname)
start = 1024
end = 1024+256
mask = util.ComputeMask(mag[:, start:end], unet_model="/Users/yanyingzi/Study/Signal Seperation/UNet-VocalSeparation-Chainer-master/unet.model", hard=False)
util.SaveAudio(
"vocal-%s" % fname, mag[:, start:end]*mask, phase[:, start:end])
util.SaveAudio(
"inst-%s" % fname, mag[:, start:end]*(1-mask), phase[:, start:end])
util.SaveAudio(
"orig-%s" % fname, mag[:, start:end], phase[:, start:end])
调用了五个主要接口:
util.LoadDataset(target)
加载数据集,target就是要分离的目标,这里为“voice”
network.TrainUNet(Xlist,Ylist,savefile="unet.model",epoch=30)
训练网络
util.LoadAudio(fname)
加载音频
util.ComputeMask(input_mag, unet_model="unet.model", hard=True)
计算掩码
util.SaveAudio(fname, mag, phase)
保存音频
(如果有现成的模型,则只需要调用后三个函数)
util.py
def LoadDataset(target="vocal"): # 加载前面处理好的数据集的时频声谱
filelist_fft = find_files(C.PATH_FFT, ext="npz")[:200]
Xlist = []
Ylist = []
for file_fft in filelist_fft:
dat = np.load(file_fft)
Xlist.append(dat["mix"])
if target == "vocal":
assert(dat["mix"].shape == dat["vocal"].shape)
Ylist.append(dat["vocal"])
else:
assert(dat["mix"].shape == dat["inst"].shape)
Ylist.append(dat["inst"])
return Xlist, Ylist
函数返回的Xlist
是混合音频的声谱,Ylist
是分轨的target音频的声谱。
find_files
是librosa.util.find_files
def network.TrainUNet(): # 见后面network.py
def LoadAudio(fname):
y, sr = load(fname, sr=C.SR) #sr: sampling rate, C.SR = 16000
spec = stft(y, n_fft=C.FFT_SIZE, hop_length=C.H, win_length=C.FFT_SIZE) # C.FFT_SIZE = 1024,C.H = 512
mag = np.abs(spec)
mag /= np.max(mag) # 标准化
phase = np.exp(1.j*np.angle(spec))
return mag, phase
load
是librosa.core.load
, 加载各种格式的音频
- 参数为:
path:音频路径
sr:音频频率(可以不用原始的音频频率,他有重采样的功能)
mono:该值为true时候是单通道、否则为双通道
offset:读音频的开始时间,也就是可以不从头开始读取音频
duration:持续时间,可以不加载全部时间,通过与offset合作读取其中一段音频
dtype:返回的音频信号值的数据格式,一般不设置
res_type:重采样的格式,一般不用 - 返回值:
y:音频的信号值,是个numpy一维数组
sr:音频的采样值,如果参数没有设置返回的是原始采样率
stft
是librosa.core.stft
,短时傅里叶变化
- 参数为:
y:信号值
n_fft: 每个傅里叶窗口包含的样本量(建议为2的指数)
hop_length: 步长,每帧之间的采样数
win_length: 窗长,小于等于n_fft(默认 win_length = n_fft,使用整帧)
window: 窗类型,汉明窗等
center: 特征是信号的中心还是起始点
dtype: 返回的音频信号值的数据格式,一般不设置
pad_model: 对于边缘进行pad - 返回值:
D:短时傅里叶变化后的矩阵(时频声谱)D.shape=(number of frequency bins , number of time frames)=(1 + n_fft/2, n_frames)
(没想通为什么 number of frequency bins = 1 + n_fft/2 )
(网上的解释:The continuous Fourier transform possesses symmetries when computed on real signals (Hermitian symmetry). The discrete version, an FFT (of even length) possesses a slighty twisted symmetry.就是说FFT计算出来的结果是频率上对称的,存在"duplicated" in positive and negative frequencies.)
(phase也看不懂)
mag
是magnitude,做了绝对值
phase = np.exp(1.j*np.angle(spec))
返回spec的复数角度
def SaveAudio(fname, mag, phase):
y = istft(mag*phase, hop_length=C.H, win_length=C.FFT_SIZE)
write_wav(fname, y, C.SR, norm=True)
istft
是librosa.core.istft
write_wav
是librosa.output.write_wav
def SaveSpectrogram(y_mix, y_vocal, y_inst, fname, original_sr=44100):
y_mix = resample(y_mix, original_sr, C.SR)
y_vocal = resample(y_vocal, original_sr, C.SR)
y_inst = resample(y_inst, original_sr, C.SR)
S_mix = np.abs(
stft(y_mix, n_fft=C.FFT_SIZE, hop_length=C.H)).astype(np.float32)
S_vocal = np.abs(
stft(y_vocal, n_fft=C.FFT_SIZE, hop_length=C.H)).astype(np.float32)
S_inst = np.abs(
stft(y_inst, n_fft=C.FFT_SIZE, hop_length=C.H)).astype(np.float32)
norm = S_mix.max()
S_mix /= norm
S_vocal /= norm
S_inst /= norm
np.savez(os.path.join(C.PATH_FFT, fname+".npz"),
mix=S_mix, vocal=S_vocal, inst=S_inst)
stft
是librosa.core.stft
,短时傅里叶变化。SaveSpectrogram这个函数是处理数据集时用的,将数据集里的音频转化为声谱。
def ComputeMask(input_mag, unet_model="unet.model", hard=True):
unet = network.UNet()
unet.load(unet_model)
config.train = False
config.enable_backprop = False
mask = unet(input_mag[np.newaxis, np.newaxis, 1:, :]).data[0, 0, :, :]
mask = np.vstack((np.zeros(mask.shape[1], dtype="float32"), mask))
if hard: # hard mask
hard_mask = np.zeros(mask.shape, dtype="float32")
hard_mask[mask > 0.5] = 1
return hard_mask
else: # soft mask
return mask
network.UNet()
调用U-Net神经网络,用训练好的模型来计算 hard mask 或 soft mask。
network.np
UNet class
from chainer import Chain, serializers, optimizers, cuda, config
import chainer.links as L
import chainer.functions as F
import numpy as np
#import const
#cp = cuda.cupy
class UNet(Chain):
def __init__(self):
super(UNet, self).__init__()
with self.init_scope():
self.conv1 = L.Convolution2D(1, 16, 4, 2, 1)
self.norm1 = L.BatchNormalization(16)
self.conv2 = L.Convolution2D(16, 32, 4, 2, 1)
self.norm2 = L.BatchNormalization(32)
self.conv3 = L.Convolution2D(32, 64, 4, 2, 1)
self.norm3 = L.BatchNormalization(64)
self.conv4 = L.Convolution2D(64, 128, 4, 2, 1)
self.norm4 = L.BatchNormalization(128)
self.conv5 = L.Convolution2D(128, 256, 4, 2, 1)
self.norm5 = L.BatchNormalization(256)
self.conv6 = L.Convolution2D(256, 512, 4, 2, 1)
self.norm6 = L.BatchNormalization(512)
self.deconv1 = L.Deconvolution2D(512, 256, 4, 2, 1)
self.denorm1 = L.BatchNormalization(256)
self.deconv2 = L.Deconvolution2D(512, 128, 4, 2, 1)
self.denorm2 = L.BatchNormalization(128)
self.deconv3 = L.Deconvolution2D(256, 64, 4, 2, 1)
self.denorm3 = L.BatchNormalization(64)
self.deconv4 = L.Deconvolution2D(128, 32, 4, 2, 1)
self.denorm4 = L.BatchNormalization(32)
self.deconv5 = L.Deconvolution2D(64, 16, 4, 2, 1)
self.denorm5 = L.BatchNormalization(16)
self.deconv6 = L.Deconvolution2D(32, 1, 4, 2, 1)
def __call__(self, X):
print X.shape
h1 = F.leaky_relu(self.norm1(self.conv1(X)))
print h1.shape
h2 = F.leaky_relu(self.norm2(self.conv2(h1)))
print h2.shape
h3 = F.leaky_relu(self.norm3(self.conv3(h2)))
print h3.shape
h4 = F.leaky_relu(self.norm4(self.conv4(h3)))
print h4.shape
h5 = F.leaky_relu(self.norm5(self.conv5(h4)))
print h5.shape
h6 = F.leaky_relu(self.norm6(self.conv6(h5)))
print h6.shape
dh = F.relu(F.dropout(self.denorm1(self.deconv1(h6))))
print dh.shape
dh = F.relu(F.dropout(self.denorm2(self.deconv2(F.concat((dh, h5))))))
print dh.shape
dh = F.relu(F.dropout(self.denorm3(self.deconv3(F.concat((dh, h4))))))
print dh.shape
dh = F.relu(self.denorm4(self.deconv4(F.concat((dh, h3)))))
print dh.shape
dh = F.relu(self.denorm5(self.deconv5(F.concat((dh, h2)))))
print dh.shape
dh = F.sigmoid(self.deconv6(F.concat((dh, h1))))
print dh.shape
return dh
def load(self, fname="unet.model"):
serializers.load_npz(fname, self)
def save(self, fname="unet.model"):
serializers.save_npz(fname, self)
TrainUNet:
class UNetTrainmodel(Chain):
def __init__(self, unet):
super(UNetTrainmodel, self).__init__()
with self.init_scope():
self.unet = unet
def __call__(self, X, Y):
O = self.unet(X)
self.loss = F.mean_absolute_error(X*O, Y)
return self.loss
def TrainUNet(Xlist, Ylist, epoch=40, savefile="unet.model"):
assert(len(Xlist) == len(Ylist))
unet = UNet()
model = UNetTrainmodel(unet)
model.to_gpu(0)
opt = optimizers.Adam()
opt.setup(model)
config.train = True
config.enable_backprop = True
itemcnt = len(Xlist)
itemlength = [x.shape[1] for x in Xlist]
subepoch = sum(itemlength) // const.PATCH_LENGTH // const.BATCH_SIZE * 4
for ep in range(epoch):
sum_loss = 0.0
for subep in range(subepoch):
X = np.zeros((const.BATCH_SIZE, 1, 512, const.PATCH_LENGTH),
dtype="float32")
Y = np.zeros((const.BATCH_SIZE, 1, 512, const.PATCH_LENGTH),
dtype="float32")
idx_item = np.random.randint(0, itemcnt, const.BATCH_SIZE)
for i in range(const.BATCH_SIZE):
randidx = np.random.randint(
itemlength[idx_item[i]]-const.PATCH_LENGTH-1)
X[i, 0, :, :] = \
Xlist[idx_item[i]][1:, randidx:randidx+const.PATCH_LENGTH]
Y[i, 0, :, :] = \
Ylist[idx_item[i]][1:, randidx:randidx+const.PATCH_LENGTH]
opt.update(model, cp.asarray(X), cp.asarray(Y))
sum_loss += model.loss.data * const.BATCH_SIZE
print("epoch: %d/%d loss=%.3f" % (ep+1, epoch, sum_loss))
unet.save(savefile)
总结
U-Net其实就是输入空间与输出空间相同的一个映射。在训练时,输入是(MixSpec, TargetSpec) = (混合声谱,目标声谱),用Adam方法优化参数θ来得到一个mask = UNet_θ(MixSpec),MixSpec*mask使得尽可能逼近TargetSpec。