深度学习笔记(九)—— CNN-3
3.FaceNet
有关FaceNet与triplet loss的理论知识请同学们复习理论课有关章节。在这里,我们将用triplet loss训练一个resnet18网络,并用这个网络在mnist数据集上进行KNN分类,具体的,resnet18相当于一个特征提取器,用所有的训练集图片的特征拟合一个KNN分类器,利用这个KNN分类进行预测. 在3.1小节,将给出triplet loss的实现. 3.2小节将实现一个适用于triplet loss训练的resnet18网络. 3.3小节将实现随机选取triplet的dataset, 3.4、3.5小节将分别实现resnet18的训练与测试函数.
embedding size
FaceNet 的作用是将图像嵌入一个d维的空间,在这个d维空间里,同一类图像的特征之间相隔的近,不同类图像的特征之间相隔的远,这个d我们称之为embedding size
3.1 triplet loss
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.backends.cudnn as cudnn
import os
import torchvision
import torchvision.transforms as transforms
from torch.autograd import Variable,Function
class PairwiseDistance(Function):
'''
compute distance of the embedding features, p is norm, when p is 2, then return L2-norm distance
'''
def __init__(self, p):
super(PairwiseDistance, self).__init__()
self.norm = p
def forward(self, x1, x2):
eps = 1e-6 # in case of zeros
diff = torch.abs(x1 - x2) # subtraction
out = torch.pow(diff, self.norm).sum(dim=1) # square
return torch.pow(out + eps, 1. / self.norm) # L-p norm
class TripletLoss(Function):
'''
Triplet loss function.
loss = max(diatance(a,p) - distance(a,n) + margin, 0)
forward method:
args:
anchor, positive, negative
return:
triplet loss
'''
def __init__(self, margin, num_classes=10):
super(TripletLoss, self).__init__()
self.margin = margin
self.num_classes = num_classes
self.pdist = PairwiseDistance(2) # to calculate distance
def forward(self, anchor, positive, negative):
d_p = self.pdist.forward(anchor, positive) # distance of anchor and positive
d_n = self.pdist.forward(anchor, negative) # distance of anchor and negative
dist_hinge = torch.clamp(self.margin + d_p - d_n, min=0.0) # ensure loss is no less than zero
loss = torch.mean(dist_hinge)
return loss
3.2 resnet-18 for triplet loss
class BasicBlock(nn.Module):
'''
resnet basic block.
one block includes two conv layer and one residual
'''
expansion = 1
def __init__(self, in_planes, planes, stride=1):
super(BasicBlock, self).__init__()
self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
self.bn1 = nn.BatchNorm2d(planes)
self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=1, padding=1, bias=False)
self.bn2 = nn.BatchNorm2d(planes)
self.shortcut = nn.Sequential()
if stride != 1 or in_planes != self.expansion*planes:
self.shortcut = nn.Sequential(
nn.Conv2d(in_planes, self.expansion*planes, kernel_size=1, stride=stride, bias=False),
nn.BatchNorm2d(self.expansion*planes)
)
def forward(self, x):
out = F.relu(self.bn1(self.conv1(x)))
out = self.bn2(self.conv2(out))
out += self.shortcut(x)
out = F.relu(out)
return out
class ResNetTriplet(nn.Module):
def __init__(self, block, num_blocks, embedding_size=256, num_classes=10):
super(ResNetTriplet, self).__init__()
self.in_planes = 64
self.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False)
self.bn1 = nn.BatchNorm2d(64)
# feature map size 32x32
self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1)
# feature map size 32x32
self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2)
# feature map size 16x16
self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2)
# feature map size 8x8
self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2)
# feature map size 4x4
# as we use resnet basic block, the expansion is 1
self.linear = nn.Linear(512*block.expansion, embedding_size)
def _make_layer(self, block, planes, num_blocks, stride):
strides = [stride] + [1]*(num_blocks-1)
layers = []
for stride in strides:
layers.append(block(self.in_planes, planes, stride))
self.in_planes = planes * block.expansion
return nn.Sequential(*layers)
def l2_norm(self,input):
input_size = input.size()
buffer = torch.pow(input, 2)
normp = torch.sum(buffer, 1).add_(1e-10)
norm = torch.sqrt(normp)
_output = torch.div(input, norm.view(-1, 1).expand_as(input))
output = _output.view(input_size)
return output
def forward(self, x):
out = F.relu(self.bn1(self.conv1(x)))
out = self.layer1(out)
out = self.layer2(out)
out = self.layer3(out)
out = self.layer4(out)
out = F.avg_pool2d(out, 4)
out = out.view(out.size(0), -1)
out = self.linear(out)
# normalize the features, then we set margin easily
self.features = self.l2_norm(out)
# multiply by alpha = 10 as suggested in https://arxiv.org/pdf/1703.09507.pdf
alpha = 10
self.features = self.features * alpha
# here we get the 256-d features, next we use those features to make prediction
return self.features
def ResNet18(embedding_size=256, num_classes=10):
return ResNetTriplet(BasicBlock, [2,2,2,2], embedding_size, num_classes)
3.3 triplet dataloader
Question 3
仔细阅读下面代码,对pic_classes的作用进行思考,回答下面问题:下面选取triplet的方式是随机选取,若要改为选择指定类别选取,怎么修改?请写出修改后的两行代码。
Answer
Pic_classes is convenient for us to find the path of the image id in train.csv and test.csv. For example, the corresponding content of pic_classes['0'] is a list of ids of image with a number of 0.
In order to produce a triplet sample, we need to take two pictures from the category of pos_class (one anc and one pos) and one picture from neg_class.
# you need to assign value to selected_pos(neg)_class first
pos_class = selected_pos_class
neg_class = selected_neg_class
import numpy as np
import pandas as pd
import torch
from PIL import Image
from torch.utils.data import Dataset
class TripletFaceDataset(Dataset):
def __init__(self, root_dir, csv_name, num_triplets, transform = None):
'''
randomly select triplet,which means anchor,positive and negative are all selected randomly.
args:
root_dir : dir of data set
csv_name : dir of train.csv
num_triplets: total number of triplets
'''
self.root_dir = root_dir
self.df = pd.read_csv(csv_name)
self.num_triplets = num_triplets
self.transform = transform
self.training_triplets = self.generate_triplets(self.df, self.num_triplets)
@staticmethod
def generate_triplets(df, num_triplets):
def make_dictionary_for_pic_class(df):
'''
make csv to the format that we want
- pic_classes = {'class0': [class0_id0, ...], 'class1': [class1_id0, ...], ...}
'''
pic_classes = dict()
for idx, label in enumerate(df['class']):
if label not in pic_classes:
pic_classes[label] = []
pic_classes[label].append(df.iloc[idx, 0])
return pic_classes
triplets = []
classes = df['class'].unique()
pic_classes = make_dictionary_for_pic_class(df)
for _ in range(num_triplets):
'''
- randomly choose anchor, positive and negative images for triplet loss
- anchor and positive images in pos_class
- negative image in neg_class
- at least, two images needed for anchor and positive images in pos_class
- negative image should have different class as anchor and positive images by definition
'''
pos_class = np.random.choice(classes) # random choose positive class
neg_class = np.random.choice(classes) # random choose negative class
# if choose anchor
while len(pic_classes[pos_class]) < 2:
pos_class = np.random.choice(classes)
# if neg in the same class as anchor and pos
while pos_class == neg_class:
neg_class = np.random.choice(classes)
pos_name = df.loc[df['class'] == pos_class, 'name'].values[0] # get positive class's name
neg_name = df.loc[df['class'] == neg_class, 'name'].values[0] # get negative class's name
if len(pic_classes[pos_class]) == 2:
ianc, ipos = np.random.choice(2, size = 2, replace = False)
else:
# both anchor and positive images are in pos_class but are not the same image
ianc = np.random.randint(0, len(pic_classes[pos_class])) # random choose anchor
ipos = np.random.randint(0, len(pic_classes[pos_class])) # random choose positive
while ianc == ipos:
ipos = np.random.randint(0, len(pic_classes[pos_class]))
ineg = np.random.randint(0, len(pic_classes[neg_class])) # random choose negative
triplets.append([pic_classes[pos_class][ianc], pic_classes[pos_class][ipos], pic_classes[neg_class][ineg],
pos_class, neg_class, pos_name, neg_name])
return triplets
def __getitem__(self, idx):
anc_id, pos_id, neg_id, pos_class, neg_class, pos_name, neg_name = self.training_triplets[idx]
anc_img = os.path.join(self.root_dir, str(pos_name), str(anc_id) + '.png') # join the path of anchor
pos_img = os.path.join(self.root_dir, str(pos_name), str(pos_id) + '.png') # join the path of positive
neg_img = os.path.join(self.root_dir, str(neg_name), str(neg_id) + '.png') # join the path of nagetive
anc_img = Image.open(anc_img).convert('RGB') # open the anchor image
pos_img = Image.open(pos_img).convert('RGB') # open the positive image
neg_img = Image.open(neg_img).convert('RGB') # open the negative image
pos_class = torch.from_numpy(np.array([pos_class]).astype('long')) # make label transform the type we want
neg_class = torch.from_numpy(np.array([neg_class]).astype('long')) # make label transform the type we want
data = [anc_img, pos_img,neg_img]
label = [pos_class, pos_class, neg_class]
if self.transform:
data = [self.transform(img) # preprocessing the image
for img in data]
return data, label
def __len__(self):
return len(self.training_triplets)
3.4 train function for triplet loss
import torchvision.transforms as transforms
def train_facenet(epoch, model, optimizer, margin, num_triplets):
model.train()
# preprocessing function for image
transform = transforms.Compose([
transforms.Resize(32),
transforms.CenterCrop(32),
transforms.ToTensor(),
transforms.Normalize(
mean=np.array([0.4914, 0.4822, 0.4465]),
std=np.array([0.2023, 0.1994, 0.2010])),
])
# get dataset of triplet
# num_triplet is adjustable
train_set = TripletFaceDataset(root_dir = './mnist/train',
csv_name = './mnist/train.csv',
num_triplets = num_triplets,
transform = transform)
train_loader = torch.utils.data.DataLoader(train_set,
batch_size = 16,
shuffle = True)
total_loss = 0.0
for batch_idx, (data, target) in enumerate(train_loader):
# load data to gpu
data[0], target[0] = data[0].cuda(device='cuda:1'), target[0].cuda(device='cuda:1') # anchor to cuda
data[1], target[1] = data[1].cuda(device='cuda:1'), target[1].cuda(device='cuda:1') # positive to cuda
data[2], target[2] = data[2].cuda(device='cuda:1'), target[2].cuda(device='cuda:1') # negative to cuda
data[0], target[0] = Variable(data[0]), Variable(target[0]) # anchor
data[1], target[1] = Variable(data[1]), Variable(target[1]) # positive
data[2], target[2] = Variable(data[2]), Variable(target[2]) # negative
# zero setting the grad
optimizer.zero_grad()
# forward
anchor = model.forward(data[0])
positive = model.forward(data[1])
negative = model.forward(data[2])
# margin is adjustable
loss = TripletLoss(margin=margin, num_classes=10).forward(anchor, positive, negative) # get triplet loss
total_loss += loss.item()
# back-propagating
loss.backward()
optimizer.step()
context = 'Train Epoch: {} [{}/{}], Average loss: {:.4f}'.format(
epoch, len(train_loader.dataset), len(train_loader.dataset), total_loss / len(train_loader))
print(context)
3.5 test function for triplet loss
关于如何测试的问题,由于triplet loss训练的resnet18网络没有分类器,这个网络的最后一层的输出是一个维度为embedding_size的向量,我们把它当作由模型提取出的特征,所以利用这个特征来做测试。首先保存下训练集上所有图片的特征和标签,用sklearn库的KNeighborsClassifier()拟合成一个KNN分类器,这里的K表示领域的个数,K是一个可调节的参数,在测试集上做验证时,提取图片的特征用KNN分类器做预测即可。
Question 4
仔细阅读下面代码,回答问题:下面的预测方法为KNN预测,若要改为中心点预测的方式,即找出每个类别的离均值点最近的图片做最近邻预测,请简述找出中心点的方法,无需写代码。
Answer 4
- First, the model is used to output the one-hot vectors of each type of image - cluster.
- Then, the mean value - center - of each cluster is calculated.
- Images who are relatively closer to the center in the cluster are found as the representatives.
- In the prediction, the similarity between the image and the centers is calculated.
- Finally, we obtain the classification result.
from sklearn import neighbors
import pandas
import matplotlib.pyplot as plt
def KNN_classifier(model, epoch, n_neighbors):
'''
use all train set data to make KNN classifier
'''
model.eval()
# preprocessing function for image
transform = transforms.Compose([
transforms.Resize(32),
transforms.ToTensor(),
transforms.Normalize(
mean=np.array([0.485, 0.456, 0.406]),
std=np.array([0.229, 0.224, 0.225])),
])
# prepare dataset by ImageFolder, data should be classified by directory
train_set = torchvision.datasets.ImageFolder(root='./mnist/train', transform=transform)
train_loader = torch.utils.data.DataLoader(train_set, batch_size=32, shuffle=False)
features, labels =[], [] # store features and labels
for i, (data, target) in enumerate(train_loader):
# load data to gpu
data, target = data.cuda(device='cuda:1'), target.cuda(device='cuda:1')
data, target = Variable(data), Variable(target)
# forward
output = model(data)
# get features and labels to make knn classifier
features.extend(output.data.cpu().numpy())
labels.extend(target.data.cpu().numpy())
# n_neighbor is adjustable
clf = neighbors.KNeighborsClassifier(n_neighbors=n_neighbors)
clf.fit(features, labels)
return clf
def find_nearest_image(feature, label, model, clf):
model.eval()
# preprocessing function for image
transform = transforms.Compose([
transforms.Resize(32),
transforms.ToTensor(),
transforms.Normalize(
mean=np.array([0.485, 0.456, 0.406]),
std=np.array([0.229, 0.224, 0.225])),
])
frame = pandas.read_csv('./mnist/train.csv')
# prepare dataset by ImageFolder, data should be classified by directory
train_set = torchvision.datasets.ImageFolder(root = './mnist/train', transform = transform)
train_loader = torch.utils.data.DataLoader(train_set, batch_size = 32, shuffle = False)
features = []
targets = []
for i, (data, target) in enumerate(train_loader):
# load data to gpu
data, target = data.cuda(device='cuda:1'), target.cuda(device='cuda:1')
data, target = Variable(data), Variable(target)
# forward
output = model.forward(data)
features.extend(output.data.cpu().numpy())
targets.extend(target.data.cpu().numpy())
min_index = -1
min_dist = 0
for index in range(len(features)):
if targets[index] == label:
dist = np.linalg.norm(feature - features[index])
if min_index == -1:
min_dist = dist
min_index = 0
else:
if dist < min_dist:
min_dist = dist
min_index = index
return os.path.join('./mnist/train/', str(frame['name'][min_index]), str(frame['id'][min_index]) + '.png')
def test_facenet(epoch, model, clf, test = True, last = False):
model.eval()
# preprocessing function for image
transform = transforms.Compose([
transforms.Resize(32),
transforms.ToTensor(),
transforms.Normalize(
mean=np.array([0.485, 0.456, 0.406]),
std=np.array([0.229, 0.224, 0.225])),
])
frame = pandas.read_csv('./mnist/test.csv')
# prepare dataset by ImageFolder, data should be classified by directory
test_set = torchvision.datasets.ImageFolder(root = './mnist/test' if test else './mnist/train', transform = transform)
test_loader = torch.utils.data.DataLoader(test_set, batch_size = 32, shuffle = False)
correct, total = 0, 0
features = []
predicts = []
targets = []
for i, (data, target) in enumerate(test_loader):
# load data to gpu
data, target = data.cuda(device='cuda:1'), target.cuda(device='cuda:1')
data, target = Variable(data), Variable(target)
# forward
output = model.forward(data)
# predict by knn classifier
predicted = clf.predict(output.data.cpu().numpy())
correct += (torch.tensor(predicted) == target.data.cpu()).sum()
total += target.size(0)
if test and last:
features.extend(output.data.cpu().numpy())
targets.extend(target.data.cpu().numpy())
predicts.extend(torch.tensor(predicted).numpy())
if test and last:
err_count = 0
for index in range(len(features)):
if not predicts[index] == targets[index]:
image_path = os.path.join('./mnist/test/', str(frame['name'][index]), str(frame['id'][index]) + '.png')
image = Image.open(image_path).convert('RGB')
plt.subplot(1,3,1)
plt.imshow(image)
# save origin images
error_image_path = os.path.join('./pics/errors/', 'img%d_origin_(%d).png' %(err_count, targets[index]))
image.save(error_image_path)
path_nearest_target = find_nearest_image(features[index], targets[index], model, clf)
path_nearest_predict = find_nearest_image(features[index], predicts[index], model, clf)
image_nearest_target = Image.open(path_nearest_target).convert('RGB')
image_nearest_predict = Image.open(path_nearest_predict).convert('RGB')
plt.subplot(1,3,2)
plt.imshow(image_nearest_target)
plt.subplot(1,3,3)
plt.imshow(image_nearest_predict)
plt.show()
# save nearest taget
target_image_path = os.path.join('./pics/errors/', 'img%d_near_tgt_(%d).png' %(err_count, targets[index]))
image_nearest_target.save(target_image_path)
# save nearest predict
predict_image_path = os.path.join('./pics/errors/', 'img%d_near_pdc_(%d).png' %(err_count, predicts[index]))
image_nearest_predict.save(predict_image_path)
err_count += 1
print("Error images saved!")
context = 'Accuracy of model in ' + ('test' if test else 'train') + \
' set is {}/{}({:.2f}%)'.format(correct, total, 100. * float(correct) / float(total))
print(context)
3.6训练与测试
def run_facenet():
# hyper parameter
lr = 0.008
margin = 2.0
num_triplets = 8000
n_neighbors = 5
embedding_size = 128
num_epochs = 5
# embedding_size is adjustable
model = ResNet18(embedding_size, 10)
# load model into GPU device
device = torch.device('cuda:1')
model = model.to(device)
if device == 'cuda':
model = torch.nn.DataParallel(model)
cudnn.benchmark = True
# define the optimizer, lr、momentum、weight_decay is adjustable
optimizer = optim.SGD(model.parameters(), lr=lr, momentum=0.9, weight_decay=5e-4)
print('start training')
for epoch in range(num_epochs):
train_facenet(epoch, model, optimizer, margin, num_triplets) # train resnet18 with triplet loss
clf = KNN_classifier(model, epoch, n_neighbors) # get knn classifier
last = False
if epoch == num_epochs - 1:
last = True
test_facenet(epoch, model, clf, False) # validate train set
test_facenet(epoch, model, clf, True, last) # validate test set
if (epoch + 1) % 4 == 0 :
lr = lr / 3
for param_group in optimizer.param_groups:
param_group['lr'] = lr
run_facenet()
start training
Train Epoch: 0 [8000/8000], Average loss: 0.3779
Accuracy of model in train set is 1899/2000(94.95%)
Accuracy of model in test set is 939/1000(93.90%)
Train Epoch: 1 [8000/8000], Average loss: 0.0987
Accuracy of model in train set is 1967/2000(98.35%)
Accuracy of model in test set is 960/1000(96.00%)
Train Epoch: 2 [8000/8000], Average loss: 0.0527
Accuracy of model in train set is 1970/2000(98.50%)
Accuracy of model in test set is 971/1000(97.10%)
Train Epoch: 3 [8000/8000], Average loss: 0.0376
Accuracy of model in train set is 1983/2000(99.15%)
Accuracy of model in test set is 985/1000(98.50%)
Train Epoch: 4 [8000/8000], Average loss: 0.0165
Accuracy of model in train set is 1994/2000(99.70%)
image
image
image
image
image
image
image
image
image
image
image
image
image
image
image
image
image
image
image
image
Error images saved!
Accuracy of model in test set is 980/1000(98.00%)
Question 5
训练一个较好的resnet18网络,收集在测试集上所有预测错误的样本图片(1000张测试集图片,分错不应超过30张,5%)。并在训练集上找出离这个样本最近的同类样本和错类样本的图片,并作出简要分析(15%)。例如,对于一个样本sample,正确类别为A,模型将其错分为B,分别找出训练集中A类样本和B类样本中离sample最近的样本图片(注意是图片!注意一定要保存在pics文件夹或者自定义文件夹一同提交,否则TA看不到,将图片在下面展示出来)。
hints:重写 test_facenet()函数
hints:根据特征反向寻找图片可参考下列代码. 需保证shuffle=False,train.csv和test.csv均已给出
Answer 5
To collect the errors among the test set, a director named ./pics/errors serves to save the wrong predictions. test_facenet() is rewritten as above. Errors are saved in format like img + err_count +origin(near_tgt, near_pdc) + (res).
From the results, we can find that most specimen errors occurs when the scripts are too blurry or the digits are too similar. And confusions between 0 and 6, 2 and 3, 3 and 8, 5 and 3 ... are common.
3.7 Hard triplet
Triplet loss的性能与采样方式有很大的关系,这里简述两种hard-triplet的采样方式,batch-hard与semi-hard。
Batch hard
对于每一个minibatch,随机选择P个类,每一类随机挑选K张不同的图片,即一个minibatch有PxK张不同的图片。每一张图片都作为anchor,找出minibatch里面距离anchor最远的正样本和距离最近的负样本,组成一个triplet。loss可表示为:
batch_hard
Semi hard
与batch-hard不同,semi-hard triplet只需要保证minibatch中anchor到positive的距离小于anchor到negative的距离即为semi-hard,见下图,不需要选出minibatch里面距离anchor最远的负样本
semi_hard
Question 6
本次实验是分类任务的最后一次实验,你对分类任务的学习有何感想?
Answer 6
- It's really any onerous work for only a tiny lift!!!
- Sometimes, GPU overcome the computing problem, but this also blind us to some extent. We usually do not attach high importance to the computing difficulties. (
Bad for algorithm innovation???) - The recognition of performances of such complicated NNs are hard to finally verdict, for we lack support from other cases.
- Some intriguing ticks such as triplet and BCE are introduced in the modules, inspires us and motivating us to eplore further at the meantime.
- ResNet and DenseNet are really siblings, they come from one family! And it's interesting to see their deviations.
- From my own perspective, the progression after ResNet is not showing a lot of progress somehow (probably because of the limitation of dataset), so the advancement is still murky for me.
- I really admire the intelligence of the people who come up with the similarity comparing and loss caculating methods, that's really amazing.
作业附加题:
pytorch实现batch-hard或semi-hard的其中一种,重新训练resnet18,对比上面的随机选择triplet的采样方法,其训练过程和结果有何不同,你有更优的方法吗?(不做不扣分,实现一种有较高加分,鼓励同学们挑战高难度)