第二课:高斯,单峰,连续, 卡尔曼滤波
2019-03-01 本文已影响2人
Allen的光影天地
一维高斯分布
代码
关键问题描述都放在代码注释里
# Write a program that will iteratively update and
# predict based on the location measurements
# and inferred motions shown below.
# 用传感器更新当前状态,主要是用高斯乘法
# 也就是所谓的纠偏
# 新的方差比原来两个方差都小
# 新的均值在原来两个均值之间
def update(mean1, var1, mean2, var2):
new_mean = float(var2 * mean1 + var1 * mean2) / (var1 + var2)
new_var = 1./(1./var1 + 1./var2)
return [new_mean, new_var]
# 移动部分就是直接按照当前的速度或者方向,向前推进对应的距离
# 在移动的过程,我们直接将移动造成的均值和方差加在原来均值和方差之上
# 势必会导致方差变大,这也是合乎常理,因为移动预测势必会造成信息丢失
def predict(mean1, var1, mean2, var2):
new_mean = mean1 + mean2
new_var = var1 + var2
return [new_mean, new_var]
# 传感器的更新状态
measurements = [5., 6., 7., 9., 10.]
# 每次移动的距离
motion = [1., 1., 2., 1., 1.]
#传感器方差
measurement_sig = 4.
# 移动的方差
motion_sig = 2.
# 初始均值
mu = 0.
# 初始方差
sig = 10000.
#Please print out ONLY the final values of the mean
#and the variance in a list [mu, sig].
# Insert code here
for i in range(len(measurements)):
[mu, sig] = update(mu, sig, measurements[i], measurement_sig)
print ('update',[mu, sig])
[mu, sig] = predict(mu, sig, motion[i], motion_sig)
print ('predict',[mu, sig])
print [mu, sig]
输出
update [4.998000799680128, 3.9984006397441023]
predict [5.998000799680128, 5.998400639744102]
update [5.999200191953932, 2.399744061425258]
predict [6.999200191953932, 4.399744061425258]
update [6.999619127420922, 2.0951800575117594]
predict [8.999619127420921, 4.09518005751176]
update [8.999811802788143, 2.0235152416216957]
predict [9.999811802788143, 4.023515241621696]
update [9.999906177177365, 2.0058615808441944]
predict [10.999906177177365, 4.005861580844194]
分析
可以很直观的看到,传感器对整体定位的逐步调整
mu = 0
sig = 0.0001
值得注意的一点是:
如上述代码
如果我们将初始方差调到很小:意味着我们非常确信最开始的定位是准确的。
我们同样会有对应的一组输出
update [1.2499968750078127e-05, 9.999975000062502e-06]
predict [1.00001249996875, 2.000009999975]
update [2.6666805554976856, 1.3333377777592592]
predict [3.6666805554976856, 3.333337777759259]
update [5.181826859462716, 1.8181831404895565]
predict [7.181826859462716, 3.8181831404895563]
update [8.06977203891725, 1.9534887182243577]
predict [9.06977203891725, 3.9534887182243574]
update [9.532166075020006, 1.9883041811151205]
predict [10.532166075020006, 3.9883041811151205]
但是随着一次次的更新,我们发现,传感器具有更高的可信度,于是最终结果也是非常接近我们预期效果位置11
高维高斯分布
先看高维情况的卡尔曼滤波
https://zhuanlan.zhihu.com/p/39912633
二维4阶卡尔曼滤波实现
# Write a function 'kalman_filter' that implements a multi-
# dimensional Kalman Filter for the example given
from math import *
class matrix:
# implements basic operations of a matrix class
def __init__(self, value):
self.value = value
self.dimx = len(value)
self.dimy = len(value[0])
if value == [[]]:
self.dimx = 0
def zero(self, dimx, dimy):
# check if valid dimensions
if dimx < 1 or dimy < 1:
raise ValueError("Invalid size of matrix")
else:
self.dimx = dimx
self.dimy = dimy
self.value = [[0 for row in range(dimy)] for col in range(dimx)]
def identity(self, dim):
# check if valid dimension
if dim < 1:
raise ValueError("Invalid size of matrix")
else:
self.dimx = dim
self.dimy = dim
self.value = [[0 for row in range(dim)] for col in range(dim)]
for i in range(dim):
self.value[i][i] = 1
def show(self):
for i in range(self.dimx):
print(self.value[i])
print(' ')
def __add__(self, other):
# check if correct dimensions
if self.dimx != other.dimx or self.dimy != other.dimy:
raise ValueError("Matrices must be of equal dimensions to add")
else:
# add if correct dimensions
res = matrix([[]])
res.zero(self.dimx, self.dimy)
for i in range(self.dimx):
for j in range(self.dimy):
res.value[i][j] = self.value[i][j] + other.value[i][j]
return res
def __sub__(self, other):
# check if correct dimensions
if self.dimx != other.dimx or self.dimy != other.dimy:
raise ValueError("Matrices must be of equal dimensions to subtract")
else:
# subtract if correct dimensions
res = matrix([[]])
res.zero(self.dimx, self.dimy)
for i in range(self.dimx):
for j in range(self.dimy):
res.value[i][j] = self.value[i][j] - other.value[i][j]
return res
def __mul__(self, other):
# check if correct dimensions
if self.dimy != other.dimx:
raise ValueError("Matrices must be m*n and n*p to multiply")
else:
# multiply if correct dimensions
res = matrix([[]])
res.zero(self.dimx, other.dimy)
for i in range(self.dimx):
for j in range(other.dimy):
for k in range(self.dimy):
res.value[i][j] += self.value[i][k] * other.value[k][j]
return res
def transpose(self):
# compute transpose
res = matrix([[]])
res.zero(self.dimy, self.dimx)
for i in range(self.dimx):
for j in range(self.dimy):
res.value[j][i] = self.value[i][j]
return res
# Thanks to Ernesto P. Adorio for use of Cholesky and CholeskyInverse functions
def Cholesky(self, ztol=1.0e-5):
# Computes the upper triangular Cholesky factorization of
# a positive definite matrix.
res = matrix([[]])
res.zero(self.dimx, self.dimx)
for i in range(self.dimx):
S = sum([(res.value[k][i]) ** 2 for k in range(i)])
d = self.value[i][i] - S
if abs(d) < ztol:
res.value[i][i] = 0.0
else:
if d < 0.0:
raise ValueError("Matrix not positive-definite")
res.value[i][i] = sqrt(d)
for j in range(i + 1, self.dimx):
S = sum([res.value[k][i] * res.value[k][j] for k in range(self.dimx)])
if abs(S) < ztol:
S = 0.0
try:
res.value[i][j] = (self.value[i][j] - S) / res.value[i][i]
except:
raise ValueError("Zero diagonal")
return res
def CholeskyInverse(self):
# Computes inverse of matrix given its Cholesky upper Triangular
# decomposition of matrix.
res = matrix([[]])
res.zero(self.dimx, self.dimx)
# Backward step for inverse.
for j in reversed(range(self.dimx)):
tjj = self.value[j][j]
S = sum([self.value[j][k] * res.value[j][k] for k in range(j + 1, self.dimx)])
res.value[j][j] = 1.0 / tjj ** 2 - S / tjj
for i in reversed(range(j)):
res.value[j][i] = res.value[i][j] = -sum(
[self.value[i][k] * res.value[k][j] for k in range(i + 1, self.dimx)]) / self.value[i][i]
return res
def inverse(self):
aux = self.Cholesky()
res = aux.CholeskyInverse()
return res
def __repr__(self):
return repr(self.value)
# z 是measurement
def update(x, P, z):
y = z - H * x
S = H * P * H.transpose() + R # 加入波动的协方差
K = P * H.transpose() * S.inverse()
x = x + (K * y)
P = (I - K * H) * P
return x, P
def prediction(x, P):
x = F * x + u
P = F * P * F.transpose()
return x, P
def kalman_filter(x, P):
for n in range(len(measurements)):
z = matrix([measurements[n]]).transpose()
# 本例中是先预测后测量
x, P = prediction(x, P)
x, P = update(x, P, z)
return x, P
# measurements = [1, 2, 3] # 传感器的均值
# x = matrix([[0.], [0.]]) # initial state (location and velocity) 均值
# P = matrix([[1000., 0.], [0., 1000.]]) # initial uncertainty 协方差
# u = matrix([[0.], [0.]]) # external motion 其他控制建模,比如司机手刹
# F = matrix([[1., 1.], [0, 1.]]) # next state function 用于预测
# H = matrix([[1., 0.]]) # measurement function
# R = matrix([[1.]]) # measurement uncertainty 传感器的协方差
# I = matrix([[1., 0.], [0., 1.]]) # identity matrix
# measurements = [[5., 10.], [6., 8.], [7., 6.], [8., 4.], [9., 2.], [10., 0.]]
# initial_xy = [4., 12.]
# measurements = [[1., 4.], [6., 0.], [11., -4.], [16., -8.]]
# initial_xy = [-4., 8.]
measurements = [[1., 17.], [1., 15.], [1., 13.], [1., 11.]]
initial_xy = [1., 19.]
dt = 0.1
x = matrix([[initial_xy[0]], [initial_xy[1]], [0.], [0.]]) # initial state (location and velocity)
u = matrix([[0.], [0.], [0.], [0.]]) # external motion
P = matrix([[0., 0., 0., 0.],
[0., 0., 0., 0.],
[0., 0., 1000., 0.],
[0., 0., 0., 1000.]]) # initial uncertainty: 0 for positions x and y, 1000 for the two velocities
F = matrix([[1., 0, dt, 0.],
[0., 1, 0., dt],
[0., 0, 1., 0.],
[0., 0., 0., 1.]]) # next state function: generalize the 2d version to 4d
H = matrix([[1., 0., 0., 0.],
[0.,1.,0.,0.]]) # measurement function: reflect the fact that we observe x and y but not the two velocities
R = matrix([[0.1, 0], [0, 0.1]]) # measurement uncertainty: use 2x2 matrix with 0.1 as main diagonal
I = matrix([[1., 0.,0.,0.], [0., 1.,0.,0.],[0.,0.,1.,0.],[0.,0.,0.,1.]]) # 4d identity matrix
a = kalman_filter(x, P)
a[0].show()
a[1].show()
核心为后面,前面是实现了一个矩阵的API便于使用,其实完全可以用numpy
本例中使用的卡尔曼滤波公式
其中 R矩阵 也就是传感器的误差(方差),在真实情况下,可以通过实验获得,或者最开始设置一个较大的数字,然后观看效果。如果不错的话,就保留