U-Net实现语音分离的代码分析

2019-10-15  本文已影响0人  莹子说她想吃烤冷面

不是自己对着论文复现的,是网上找来自己改的

用到的包

代码

预处理:ProcessDSD.py

数据集处理,将DSD100数据集的音频文件转换为时频声谱。
DSD 包含两个文件夹,一个是混合音频的文件夹"Mixtures", 另一个是人声、鼓、贝司、其他乐器的分轨音频"Sources"。每个文件夹里包含两个子文件夹,"Dev" 是训练集,"Test"是测试集。

import numpy as np
from librosa.core import load
import util
import os

PATH_DSD_SOURCE = ["DSD100/Sources/Dev", "DSD100/Sources/Test"]
PATH_DSD_MIXTURE = ["DSD100/Mixtures/Dev", "DSD100/Mixtures/Test"]

FILE_MIX = "mixture.wav"
FILE_BASS = "bass.wav"
FILE_DRUMS = "drums.wav"
FILE_OTHER = "other.wav"
FILE_VOCAL = "vocals.wav"


list_source_dir = [os.path.join(PATH_DSD_SOURCE[0], f)
                   for f in os.listdir(PATH_DSD_SOURCE[0])]
list_source_dir.extend([os.path.join(PATH_DSD_SOURCE[1], f)
                        for f in os.listdir(PATH_DSD_SOURCE[1])])
list_source_dir = sorted(list_source_dir)

list_mix_dir = [os.path.join(PATH_DSD_MIXTURE[0], f)
                for f in os.listdir(PATH_DSD_MIXTURE[0])]
list_mix_dir.extend([os.path.join(PATH_DSD_MIXTURE[1], f)
                     for f in os.listdir(PATH_DSD_MIXTURE[1])])
list_mix_dir = sorted(list_mix_dir)


for mix_dir, source_dir in zip(list_mix_dir,  list_source_dir):
    assert(mix_dir.split("/")[-1] == source_dir.split("/")[-1])
    fname = mix_dir.split("/")[-1]
    print("Processing: " + fname)
    y_mix, sr = load(os.path.join(mix_dir, FILE_MIX), sr=None)
    y_vocal, _ = load(os.path.join(source_dir, FILE_VOCAL), sr=None)
    y_inst = sum([load(os.path.join(source_dir, f), sr=None)[0]
                  for f in [FILE_DRUMS, FILE_BASS, FILE_OTHER]])

    assert(y_mix.shape == y_vocal.shape)
    assert(y_mix.shape == y_inst.shape)

    util.SaveSpectrogram(y_mix, y_vocal, y_inst, fname)


rand_voc = np.random.randint(100, size=50)
rand_bass = np.random.randint(100, size=50)
rand_drums = np.random.randint(100, size=50)
rand_other = np.random.randint(100, size=50)

count = 1
print("Generating random mix...")
for i_voc, i_bass, i_drums, i_other in \
        zip(rand_voc, rand_bass, rand_drums, rand_other):
    y_vocal, _ = load(os.path.join(list_source_dir[i_voc], FILE_VOCAL), sr=None)
    y_bass, _ = load(os.path.join(list_source_dir[i_bass], FILE_BASS), sr=None)
    y_drums, _ = load(os.path.join(list_source_dir[i_drums], FILE_DRUMS), sr=None)
    y_other, _ = load(os.path.join(list_source_dir[i_other], FILE_OTHER), sr=None)

    minsize = min([y_vocal.size, y_bass.size, y_drums.size, y_other.size])

    y_vocal = y_vocal[:minsize]
    y_inst = y_bass[:minsize] + y_drums[:minsize] + y_other[:minsize]
    y_mix = y_vocal + y_inst

    fname = "dsd_random%02d" % count
    util.SaveSpectrogram(y_mix, y_vocal, y_inst, fname)
    print("Saved:" + fname)
    count += 1

主要用到util.SaveSpectrogram(y_mix, y_vocal, y_inst, fname)

程序总入口:DoExperiment.py

输入音频路径,训练模型或用现有模型,从原始音频获得分离的人声/分离的音频


"""
Code example for training U-Net
"""
import network

Xlist,Ylist = util.LoadDataset(target="vocal")
print("Dataset loaded.")
network.TrainUNet(Xlist,Ylist,savefile="unet.model",epoch=30)


"""
Code example for performing vocal separation with U-Net
"""
import util

fname = "Say Hello.mp3"
mag, phase = util.LoadAudio(fname)
start = 1024
end = 1024+256

mask = util.ComputeMask(mag[:, start:end], unet_model="/Users/yanyingzi/Study/Signal Seperation/UNet-VocalSeparation-Chainer-master/unet.model", hard=False)

util.SaveAudio(
    "vocal-%s" % fname, mag[:, start:end]*mask, phase[:, start:end])
util.SaveAudio(
    "inst-%s" % fname, mag[:, start:end]*(1-mask), phase[:, start:end])
util.SaveAudio(
    "orig-%s" % fname, mag[:, start:end], phase[:, start:end])

调用了五个主要接口:

util.LoadDataset(target) 加载数据集,target就是要分离的目标,这里为“voice”
network.TrainUNet(Xlist,Ylist,savefile="unet.model",epoch=30) 训练网络
util.LoadAudio(fname) 加载音频
util.ComputeMask(input_mag, unet_model="unet.model", hard=True)计算掩码
util.SaveAudio(fname, mag, phase) 保存音频
(如果有现成的模型,则只需要调用后三个函数)

util.py

def LoadDataset(target="vocal"): # 加载前面处理好的数据集的时频声谱
    filelist_fft = find_files(C.PATH_FFT, ext="npz")[:200]
    Xlist = []
    Ylist = []
    for file_fft in filelist_fft:
        dat = np.load(file_fft)
        Xlist.append(dat["mix"])
        if target == "vocal":
            assert(dat["mix"].shape == dat["vocal"].shape)
            Ylist.append(dat["vocal"])
        else:
            assert(dat["mix"].shape == dat["inst"].shape)
            Ylist.append(dat["inst"])
    return Xlist, Ylist

函数返回的Xlist是混合音频的声谱,Ylist是分轨的target音频的声谱。
find_fileslibrosa.util.find_files

def network.TrainUNet(): # 见后面network.py
def LoadAudio(fname):
    y, sr = load(fname, sr=C.SR) #sr: sampling rate, C.SR = 16000
    spec = stft(y, n_fft=C.FFT_SIZE, hop_length=C.H, win_length=C.FFT_SIZE) # C.FFT_SIZE = 1024,C.H = 512
    mag = np.abs(spec)
    mag /= np.max(mag) # 标准化
    phase = np.exp(1.j*np.angle(spec))
    return mag, phase

loadlibrosa.core.load, 加载各种格式的音频

stftlibrosa.core.stft,短时傅里叶变化

(没想通为什么 number of frequency bins = 1 + n_fft/2 )
网上的解释:The continuous Fourier transform possesses symmetries when computed on real signals (Hermitian symmetry). The discrete version, an FFT (of even length) possesses a slighty twisted symmetry.就是说FFT计算出来的结果是频率上对称的,存在"duplicated" in positive and negative frequencies.)
(phase也看不懂)

mag 是magnitude,做了绝对值
phase = np.exp(1.j*np.angle(spec)) 返回spec的复数角度

def SaveAudio(fname, mag, phase):
    y = istft(mag*phase, hop_length=C.H, win_length=C.FFT_SIZE)
    write_wav(fname, y, C.SR, norm=True)

istftlibrosa.core.istft
write_wavlibrosa.output.write_wav

def SaveSpectrogram(y_mix, y_vocal, y_inst, fname, original_sr=44100):
    y_mix = resample(y_mix, original_sr, C.SR)
    y_vocal = resample(y_vocal, original_sr, C.SR)
    y_inst = resample(y_inst, original_sr, C.SR)

    S_mix = np.abs(
        stft(y_mix, n_fft=C.FFT_SIZE, hop_length=C.H)).astype(np.float32)
    S_vocal = np.abs(
        stft(y_vocal, n_fft=C.FFT_SIZE, hop_length=C.H)).astype(np.float32)
    S_inst = np.abs(
        stft(y_inst, n_fft=C.FFT_SIZE, hop_length=C.H)).astype(np.float32)

    norm = S_mix.max()
    S_mix /= norm
    S_vocal /= norm
    S_inst /= norm

    np.savez(os.path.join(C.PATH_FFT, fname+".npz"),
             mix=S_mix, vocal=S_vocal, inst=S_inst)

stftlibrosa.core.stft,短时傅里叶变化。SaveSpectrogram这个函数是处理数据集时用的,将数据集里的音频转化为声谱。

def ComputeMask(input_mag, unet_model="unet.model", hard=True):
    unet = network.UNet()
    unet.load(unet_model)
    config.train = False
    config.enable_backprop = False
    mask = unet(input_mag[np.newaxis, np.newaxis, 1:, :]).data[0, 0, :, :]
    mask = np.vstack((np.zeros(mask.shape[1], dtype="float32"), mask))
    if hard:  # hard mask
        hard_mask = np.zeros(mask.shape, dtype="float32")
        hard_mask[mask > 0.5] = 1
        return hard_mask
    else:    # soft mask
        return mask

network.UNet() 调用U-Net神经网络,用训练好的模型来计算 hard mask 或 soft mask。

network.np

UNet class

from chainer import Chain, serializers, optimizers, cuda, config
import chainer.links as L
import chainer.functions as F
import numpy as np
#import const

#cp = cuda.cupy

class UNet(Chain):
    def __init__(self):
        super(UNet, self).__init__()
        with self.init_scope():
            self.conv1 = L.Convolution2D(1, 16, 4, 2, 1)
            self.norm1 = L.BatchNormalization(16)
            self.conv2 = L.Convolution2D(16, 32, 4, 2, 1)
            self.norm2 = L.BatchNormalization(32)
            self.conv3 = L.Convolution2D(32, 64, 4, 2, 1)
            self.norm3 = L.BatchNormalization(64)
            self.conv4 = L.Convolution2D(64, 128, 4, 2, 1)
            self.norm4 = L.BatchNormalization(128)
            self.conv5 = L.Convolution2D(128, 256, 4, 2, 1)
            self.norm5 = L.BatchNormalization(256)
            self.conv6 = L.Convolution2D(256, 512, 4, 2, 1)
            self.norm6 = L.BatchNormalization(512)
            self.deconv1 = L.Deconvolution2D(512, 256, 4, 2, 1)
            self.denorm1 = L.BatchNormalization(256)
            self.deconv2 = L.Deconvolution2D(512, 128, 4, 2, 1)
            self.denorm2 = L.BatchNormalization(128)
            self.deconv3 = L.Deconvolution2D(256, 64, 4, 2, 1)
            self.denorm3 = L.BatchNormalization(64)
            self.deconv4 = L.Deconvolution2D(128, 32, 4, 2, 1)
            self.denorm4 = L.BatchNormalization(32)
            self.deconv5 = L.Deconvolution2D(64, 16, 4, 2, 1)
            self.denorm5 = L.BatchNormalization(16)
            self.deconv6 = L.Deconvolution2D(32, 1, 4, 2, 1)

    def __call__(self, X):

        print X.shape

        h1 = F.leaky_relu(self.norm1(self.conv1(X)))
        print h1.shape
        h2 = F.leaky_relu(self.norm2(self.conv2(h1)))
        print h2.shape
        h3 = F.leaky_relu(self.norm3(self.conv3(h2)))
        print h3.shape
        h4 = F.leaky_relu(self.norm4(self.conv4(h3)))
        print h4.shape
        h5 = F.leaky_relu(self.norm5(self.conv5(h4)))
        print h5.shape
        h6 = F.leaky_relu(self.norm6(self.conv6(h5)))
        print h6.shape
        dh = F.relu(F.dropout(self.denorm1(self.deconv1(h6))))
        print dh.shape
        dh = F.relu(F.dropout(self.denorm2(self.deconv2(F.concat((dh, h5))))))
        print dh.shape
        dh = F.relu(F.dropout(self.denorm3(self.deconv3(F.concat((dh, h4))))))
        print dh.shape
        dh = F.relu(self.denorm4(self.deconv4(F.concat((dh, h3)))))
        print dh.shape
        dh = F.relu(self.denorm5(self.deconv5(F.concat((dh, h2)))))
        print dh.shape
        dh = F.sigmoid(self.deconv6(F.concat((dh, h1))))
        print dh.shape
        return dh

    def load(self, fname="unet.model"):
        serializers.load_npz(fname, self)

    def save(self, fname="unet.model"):
        serializers.save_npz(fname, self)

TrainUNet:

class UNetTrainmodel(Chain):
    def __init__(self, unet):
        super(UNetTrainmodel, self).__init__()
        with self.init_scope():
            self.unet = unet

    def __call__(self, X, Y):
        O = self.unet(X)
        self.loss = F.mean_absolute_error(X*O, Y)
        return self.loss


def TrainUNet(Xlist, Ylist, epoch=40, savefile="unet.model"):
    assert(len(Xlist) == len(Ylist))
    unet = UNet()
    model = UNetTrainmodel(unet)
    model.to_gpu(0)
    opt = optimizers.Adam()
    opt.setup(model)
    config.train = True
    config.enable_backprop = True
    itemcnt = len(Xlist)
    itemlength = [x.shape[1] for x in Xlist]
    subepoch = sum(itemlength) // const.PATCH_LENGTH // const.BATCH_SIZE * 4
    for ep in range(epoch):
        sum_loss = 0.0
        for subep in range(subepoch):
            X = np.zeros((const.BATCH_SIZE, 1, 512, const.PATCH_LENGTH),
                         dtype="float32")
            Y = np.zeros((const.BATCH_SIZE, 1, 512, const.PATCH_LENGTH),
                         dtype="float32")
            idx_item = np.random.randint(0, itemcnt, const.BATCH_SIZE)
            for i in range(const.BATCH_SIZE):
                randidx = np.random.randint(
                    itemlength[idx_item[i]]-const.PATCH_LENGTH-1)
                X[i, 0, :, :] = \
                    Xlist[idx_item[i]][1:, randidx:randidx+const.PATCH_LENGTH]
                Y[i, 0, :, :] = \
                    Ylist[idx_item[i]][1:, randidx:randidx+const.PATCH_LENGTH]
            opt.update(model, cp.asarray(X), cp.asarray(Y))
            sum_loss += model.loss.data * const.BATCH_SIZE

        print("epoch: %d/%d  loss=%.3f" % (ep+1, epoch, sum_loss))

    unet.save(savefile)

总结

U-Net其实就是输入空间与输出空间相同的一个映射。在训练时,输入是(MixSpec, TargetSpec) = (混合声谱,目标声谱),用Adam方法优化参数θ来得到一个mask = UNet_θ(MixSpec),MixSpec*mask使得尽可能逼近TargetSpec。

上一篇 下一篇

猜你喜欢

热点阅读