人体姿态评估(一) OpenPose
OpenPose已经是三年前发表的文章了,在关键点检测任务中算是比较经典的文章之一,今天基于代码以及论文再将其进行梳理一遍。首先给出论文地址以及代码地址:
论文:OpenPose: Realtime Multi-Person 2D Pose Estimation using Part Affinity Fields
代码:pytorch_Realtime_Multi-Person_Pose_Estimation
一、 背景介绍
首先我们要明确什么是人体姿态检测。从上图可以看出人体姿态检测分为两个部分,第一个部分是人体关键点检测第二个部分是人体关键点躯干连接。关于人体姿态估计可以分为两个主要方式分别为:
(1) top-down
: 该方法采用的逻辑是先检测人体的目标框,再根据人体的目标框进行关键点检测,如果人多该方法速度慢,重叠的时候也不好检测,这里有关于我另一篇博客CPN服饰关键点论文及代码解读(keras)是关于top-down的方法可以参考。
(2) bottom-up
:该方法是先检测出所有所有人体的所有关键点,然后将这些关键带你对应到不同的人体。该方法复杂度高。如果两人离得十分近,容易出现模棱两可的情况,而且由于是依赖两个部件之间的关系,所以失去了对全局的信息获取。
而我们使用的openpose采用的是"bottom-up"的方法。
其次我们在了解咱们关于人体关键点有哪些,下面用coco数据集作为介绍对象。
从图中我们看出总共有:
17+1个关键点
(这里脖子是额外添加的是Right Shoulder以及LeftShoulder的中点)19个连接处
(这里的连接处指的就是关键点之间的连接)
二、 原理介绍
关于人体姿态检测我们可以总结一下其实就是有两个部分构成分别是:关键点检测
以及关键点之间的骨架连接
。这里的难点就是在于如何制做标签,如果理解了标签如何制做也就理解了一大半了。
def single_image_processing(self, image, anns, meta, meta_init):
meta.update(meta_init)
# transform image
original_size = image.size
image = self.image_transform(image)
assert image.size(2) == original_size[0]
assert image.size(1) == original_size[1]
# mask valid
valid_area = meta['valid_area']
utils.mask_valid_area(image, valid_area)
self.log.debug(meta)
heatmaps, pafs = self.get_ground_truth(anns)
heatmaps = torch.from_numpy(
heatmaps.transpose((2, 0, 1)).astype(np.float32))
pafs = torch.from_numpy(pafs.transpose((2, 0, 1)).astype(np.float32))
return image, heatmaps, pafs
从上面代码也可以看出制作groundtruth
, 主要就是heatmap
以及pafs
。
下面贴出整体内部函数代码:
def get_ground_truth(self, anns):
# self.stride 8, 因为输入是368, 但是经过backbone,所以下采样为8倍, 输出为46, 46
grid_y = int(self.input_y / self.stride)
grid_x = int(self.input_x / self.stride)
channels_heat = (self.HEATMAP_COUNT + 1) # 19(18个关键点加1个背景)
channels_paf = 2 * len(self.LIMB_IDS) # 2(x,y方向)*19(19个躯干)
heatmaps = np.zeros((int(grid_y), int(grid_x), channels_heat)) # shape 46 *46 *19
pafs = np.zeros((int(grid_y), int(grid_x), channels_paf)) # shape 46 * 46 * 38
keypoints = []
for ann in anns:
single_keypoints = np.array(ann['keypoints']).reshape(17,3)
single_keypoints = self.add_neck(single_keypoints) # 添加两个肩之间的点
keypoints.append(single_keypoints)
keypoints = np.array(keypoints)
keypoints = self.remove_illegal_joint(keypoints) # 删除不可见的点
# [0 不可见, 1遮挡, 2完全可以看到]
# confidance maps for body parts
for i in range(self.HEATMAP_COUNT):
joints = [jo[i] for jo in keypoints]
for joint in joints:
if joint[2] > 0.5:
center = joint[:2]
gaussian_map = heatmaps[:, :, i]
heatmaps[:, :, i] = putGaussianMaps(
center, gaussian_map,
7.0, grid_y, grid_x, self.stride)
# pafs
for i, (k1, k2) in enumerate(self.LIMB_IDS):
# limb
count = np.zeros((int(grid_y), int(grid_x)), dtype=np.uint32)
for joint in keypoints:
if joint[k1, 2] > 0.5 and joint[k2, 2] > 0.5:
centerA = joint[k1, :2]
centerB = joint[k2, :2]
vec_map = pafs[:, :, 2 * i:2 * (i + 1)]
pafs[:, :, 2 * i:2 * (i + 1)], count = putVecMaps(
centerA=centerA,
centerB=centerB,
accumulate_vec_map=vec_map,
count=count, grid_y=grid_y, grid_x=grid_x, stride=self.stride
)
# background
heatmaps[:, :, -1] = np.maximum(
1 - np.max(heatmaps[:, :, :self.HEATMAP_COUNT], axis=2),
0.
)
return heatmaps, pafs
def __len__(self):
return len(self.ids)
2.1 关键点标签制作
关键点检测其实和我说的这一篇博客原理一样CPN服饰关键点论文及代码解读(keras), 简单来说就是在关键点周围生成高斯分布的标签,至于为什么不直接使用关键点的坐标,这是因为太绝对了,有时候我们在标注的时候也会出现误差,且点的标注也是有一定模糊性的,所以采用高斯分布的方式进行标注。如下图所示:
通过上图我们可以看到某一个通道上(这里是右肩通道)进行了高斯标注(0-1的概率值),越靠近中心点我们的值越大(最高值为1),当然这里有范围的,当超过一定的范围周围的很低的值一律设置为0。不同的通道代表不同的关键点特征图,这里的通道数为
18+1
, 因为还有背景信息,所以总共油19个特征图。NMS
上述图对应的代码如下:
# lib/datasets/heat.py
"""Implement the generate of every channel of ground truth heatmap.
:param centerA: int with shape (2,), every coordinate of person's keypoint.
:param accumulate_confid_map: one channel of heatmap, which is accumulated,
np.log(100) is the max value of heatmap.
:param params_transform: store the value of stride and crop_szie_y, crop_size_x
"""
def putGaussianMaps(center, accumulate_confid_map, sigma, grid_y, grid_x, stride):
# sigma(7.0) grid_x(46) grid_y(46) stride(8)
start = stride / 2.0 - 0.5
y_range = [i for i in range(int(grid_y))]
x_range = [i for i in range(int(grid_x))]
xx, yy = np.meshgrid(x_range, y_range)
xx = xx * stride + start
yy = yy * stride + start
d2 = (xx - center[0]) ** 2 + (yy - center[1]) ** 2
exponent = d2 / 2.0 / sigma / sigma
mask = exponent <= 4.6052
cofid_map = np.exp(-exponent)
cofid_map = np.multiply(mask, cofid_map)
accumulate_confid_map += cofid_map
accumulate_confid_map[accumulate_confid_map > 1.0] = 1.0
return accumulate_confid_map
这里有几处需要说明:
①
mask = exponent <= 4.6052
cofid_map = np.exp(-exponent)
cofid_map = np.multiply(mask, cofid_map)
这里指的是如果高斯边缘的值如果小于一定的值我们就将其设置为0
②
accumulate_confid_map[accumulate_confid_map > 1.0] = 1.0
如果高斯中心点大于1
,我们将其强制设置为1。这里为什么会超过1呢?当同一个类别的两个关键点的高斯分布有重叠的时候我们会看到该值会大于1的情况
③
accumulate_confid_map += cofid_map
将某一个关键点的所有heatmap相加。
一下代码就是将遍历所有的点,生成不同关键点的heatmap
# confidance maps for body parts
for i in range(self.HEATMAP_COUNT):
joints = [jo[i] for jo in keypoints]
for joint in joints:
if joint[2] > 0.5:
center = joint[:2]
gaussian_map = heatmaps[:, :, i]
heatmaps[:, :, i] = putGaussianMaps(center, gaussian_map, 7.0, grid_y, grid_x, self.stride)
④ 关键点NMS
如果存在多个body part,那么就会有叠加,选取值最大的那个。采用最大值,而不是平均,是为了保证每个峰的精度.实际操作就是在一定范围的邻域内的,只保留一个最大值,比如有两个峰值是挨着的,那么只取一个就可以,取那个最大值的那个。
2.2 PAF(Part Affinity Fields) 部位亲和场(躯干标签制作)
我们已经有了关键点,那如何将这些关键点对应在一起,也就是说哪些关键点属于同一个人的呢?如何将同一个人不同关键点连接起来,该部分将会使用PAF原理解决这个问题。
这个部分理解相对有一点难度,该想法的提出也是这篇论文的一个闪光点,他的作用就是将属于同一个人的不同关键点按顺序拼接。如下图所示,如何将这些点按照黑色线的方式连接而不是橙色的线连接。
首先我们需要一个原则,就是我该点通过什么来判断我下个连接点, 这样才能将两个连接点连在一起。
如上图如果我们能通过两点之间的连接的得分(或代价损失函数)算出来,我们可以以最高得分进行两两连接。连接方式可以通过贪恋算法,或者匈牙利匹配的算法。这里我们后面在说,这里我们先说下如何算出代价损失函数,我们就可以以最大得分(最小损失)进行关键点之间进行连接。
下面我们详细介绍一下下面的公式。
我们拿该图的手臂做例子
v表示的是手臂起始点与终点方向向量的单位向量。 ②
这里表示的是如果的像素在手臂上则向量为上述v的单位向量不在手臂上的像素则为0。 那么我们如何判断咱们的像素在不在手臂上面呢?如下第③点可以详细说明
③这里面表示的是我垂直手臂和手臂一致的方向表示手臂矩形范围。
所以我们需要在特征图上每一个固定连接区域设定一组向量(X方向, Y方向),并且总共有
19组连接方式(19*2=38个特征图)
上图有有两个分支我们沿着连线的方向(PAF)向量做投影,如果越接近则表明权重越大,上面做积分也就是说连接线的所有连线上的点都要做投影。这里有人会问了,在做关键点高斯的时候如果两个高斯重叠我们会算累加,但是如果是躯干呢?我们如果有两个同一个类别的两个肢体发生重叠(例如两个手臂发生了重叠)了我们应该怎么办呢?这里我们采用的是求平均向量, 即x方向的平均, y方向的平均
。
下面我们通过代码来分析躯干PAF的标签是如何制作的。
# pafs
for i, (k1, k2) in enumerate(self.LIMB_IDS):
# limb
# 这里使用count为了在遍历每一种躯干的时候计算图中每一个像素点发送重叠的次数, 为了方便后面重叠躯干求平均
count = np.zeros((int(grid_y), int(grid_x)), dtype=np.uint32)
for joint in keypoints:
if joint[k1, 2] > 0.5 and joint[k2, 2] > 0.5: # 这里选择index为2的位置是为了找到点为非关键点(0没有点1隐藏或2非隐藏)
centerA = joint[k1, :2]
centerB = joint[k2, :2]
vec_map = pafs[:, :, 2 * i:2 * (i + 1)] # shape (46, 46, 2)
pafs[:, :, 2 * i:2 * (i + 1)], count = putVecMaps(
centerA=centerA,
centerB=centerB,
accumulate_vec_map=vec_map,
count=count, grid_y=grid_y, grid_x=grid_x, stride=self.stride
)
下面我们看下putVevMaps
函数
def putVecMaps(centerA, centerB, accumulate_vec_map, count, grid_y, grid_x, stride):
centerA = centerA.astype(float)
centerB = centerB.astype(float)
thre = 1 # limb width
centerB = centerB / stride
centerA = centerA / stride
limb_vec = centerB - centerA
norm = np.linalg.norm(limb_vec)
if (norm == 0.0):
# print 'limb is too short, ignore it...'
return accumulate_vec_map, count
limb_vec_unit = limb_vec / norm
# print 'limb unit vector: {}'.format(limb_vec_unit)
# To make sure not beyond the border of this two points
min_x = max(int(round(min(centerA[0], centerB[0]) - thre)), 0)
max_x = min(int(round(max(centerA[0], centerB[0]) + thre)), grid_x)
min_y = max(int(round(min(centerA[1], centerB[1]) - thre)), 0)
max_y = min(int(round(max(centerA[1], centerB[1]) + thre)), grid_y)
range_x = list(range(int(min_x), int(max_x), 1))
range_y = list(range(int(min_y), int(max_y), 1))
xx, yy = np.meshgrid(range_x, range_y)
ba_x = xx - centerA[0] # the vector from (x,y) to centerA
ba_y = yy - centerA[1]
limb_width = np.abs(ba_x * limb_vec_unit[1] - ba_y * limb_vec_unit[0])
mask = limb_width < thre # mask is 2D
vec_map = np.copy(accumulate_vec_map) * 0.0
vec_map[yy, xx] = np.repeat(mask[:, :, np.newaxis], 2, axis=2)
vec_map[yy, xx] *= limb_vec_unit[np.newaxis, np.newaxis, :]
mask = np.logical_or.reduce(
(np.abs(vec_map[:, :, 0]) > 0, np.abs(vec_map[:, :, 1]) > 0))
accumulate_vec_map = np.multiply(
accumulate_vec_map, count[:, :, np.newaxis])
accumulate_vec_map += vec_map
count[mask == True] += 1
mask = count == 0
count[mask == True] = 1
accumulate_vec_map = np.divide(accumulate_vec_map, count[:, :, np.newaxis])
count[mask == True] = 0
return accumulate_vec_map, count
centerA
, centerB
代表两个不同的点。
limb_vec_unit
表示的是躯干单位向量
这里的min_x, min_y, max_x, max_y
表示的是两个点上下延伸左右延伸一段距离构成的矩形框(蓝色边缘扩展the=1)。如下所示:
range_x = list(range(int(min_x), int(max_x), 1))
range_y = list(range(int(min_y), int(max_y), 1))
xx, yy = np.meshgrid(range_x, range_y)
ba_x = xx - centerA[0] # the vector from (x,y) to centerA
ba_y = yy - centerA[1]
limb_width = np.abs(ba_x * limb_vec_unit[1] - ba_y * limb_vec_unit[0])
mask = limb_width < thre # mask is 2D
vec_map = np.copy(accumulate_vec_map) * 0.0
vec_map[yy, xx] = np.repeat(mask[:, :, np.newaxis], 2, axis=2)
vec_map[yy, xx] *= limb_vec_unit[np.newaxis, np.newaxis, :]
limb_width = np.abs(ba_x * limb_vec_unit[1] - ba_y * limb_vec_unit[0])
通过该公式可以理解在近躯干上,相当于接受躯干的向量,否则不在的为0。
mask = np.logical_or.reduce(
(np.abs(vec_map[:, :, 0]) > 0, np.abs(vec_map[:, :, 1]) > 0))
accumulate_vec_map = np.multiply(
accumulate_vec_map, count[:, :, np.newaxis])
accumulate_vec_map += vec_map
count[mask == True] += 1
mask = count == 0
count[mask == True] = 1
accumulate_vec_map = np.divide(accumulate_vec_map, count[:, :, np.newaxis])
count[mask == True] = 0
mask = np.logical_or.reduce( (np.abs(vec_map[:, :, 0]) > 0, np.abs(vec_map[:, :, 1]) > 0))
确定哪些值是躯干(Ture)和不是躯干(False)
accumulate_vec_map = np.multiply(
accumulate_vec_map, count[:, :, np.newaxis])
accumulate_vec_map += vec_map
count[mask == True] += 1
mask = count == 0
count[mask == True] = 1
accumulate_vec_map = np.divide(accumulate_vec_map, count[:, :, np.newaxis])
count[mask == True] = 0
求在重复躯干上的向量平均,这样就完成了我们躯干的标签了。
2.3 网络模型介绍
从上述做标签来看可以看出该网络可以分为两个分支,一个是关键点检测的分支,一个是躯干检测分支(即我们俗称的PAF)。
上面这一张图完美介绍了我们关于姿态检测的网络,首先从图中可以看出该网络结构在提取网络采用的是vgg,后面接了好几个stage,对于每一个stage分别通过两个分支进行预测,并且对每个阶段的预测最终都需要concate再计算loss,也就是说在不同的感受野下都需要算不同stage的loss。
下面将模型的forward代码展示出来:
def forward(self, x):
saved_for_loss = []
out1 = self.model0(x)
out1_1 = self.model1_1(out1)
out1_2 = self.model1_2(out1)
out2 = torch.cat([out1_1, out1_2, out1], 1)
saved_for_loss.append(out1_1)
saved_for_loss.append(out1_2)
out2_1 = self.model2_1(out2)
out2_2 = self.model2_2(out2)
out3 = torch.cat([out2_1, out2_2, out1], 1)
saved_for_loss.append(out2_1)
saved_for_loss.append(out2_2)
out3_1 = self.model3_1(out3)
out3_2 = self.model3_2(out3)
out4 = torch.cat([out3_1, out3_2, out1], 1)
saved_for_loss.append(out3_1)
saved_for_loss.append(out3_2)
out4_1 = self.model4_1(out4)
out4_2 = self.model4_2(out4)
out5 = torch.cat([out4_1, out4_2, out1], 1)
saved_for_loss.append(out4_1)
saved_for_loss.append(out4_2)
out5_1 = self.model5_1(out5)
out5_2 = self.model5_2(out5)
out6 = torch.cat([out5_1, out5_2, out1], 1)
saved_for_loss.append(out5_1)
saved_for_loss.append(out5_2)
out6_1 = self.model6_1(out6)
out6_2 = self.model6_2(out6)
saved_for_loss.append(out6_1)
saved_for_loss.append(out6_2)
return (out6_1, out6_2), saved_for_loss
pipeline
模型看起来很简单我们再看一下loss
criterion = nn.MSELoss(reduction='mean').cuda()
# Compute losses
loss1 = criterion(pred1, vec_temp)
loss2 = criterion(pred2, heat_temp)
很简单的MSE loss
。
2.4 模型预测
说完了模型模型的训练,最重要的就是模型的预测。模型的预测涉及到一个重要的问题就是关键点如何利用我们已经求出来的躯干loss进行互联。
2.4.1 关键点NMS
即两个峰值很近我们选用最大值那个点。2.4.2 关键点配对
对于关键点的配对我们可以采用匈牙利匹配的算法进行,我们通过躯干的得分来做依据,进行通用匹配,使得我们整体的得分值最大。
参考:
[1] OpenPose