第二课:高斯,单峰,连续, 卡尔曼滤波

2019-03-01  本文已影响2人  Allen的光影天地

一维高斯分布

代码

关键问题描述都放在代码注释里

# Write a program that will iteratively update and
# predict based on the location measurements 
# and inferred motions shown below. 

# 用传感器更新当前状态,主要是用高斯乘法
# 也就是所谓的纠偏
# 新的方差比原来两个方差都小
# 新的均值在原来两个均值之间
def update(mean1, var1, mean2, var2):
    new_mean = float(var2 * mean1 + var1 * mean2) / (var1 + var2)
    new_var = 1./(1./var1 + 1./var2)
    return [new_mean, new_var]


# 移动部分就是直接按照当前的速度或者方向,向前推进对应的距离
# 在移动的过程,我们直接将移动造成的均值和方差加在原来均值和方差之上
# 势必会导致方差变大,这也是合乎常理,因为移动预测势必会造成信息丢失
def predict(mean1, var1, mean2, var2):
    new_mean = mean1 + mean2
    new_var = var1 + var2
    return [new_mean, new_var]

# 传感器的更新状态
measurements = [5., 6., 7., 9., 10.]
# 每次移动的距离
motion = [1., 1., 2., 1., 1.]
#传感器方差
measurement_sig = 4.
# 移动的方差
motion_sig = 2.
# 初始均值
mu = 0.
# 初始方差
sig = 10000.

#Please print out ONLY the final values of the mean
#and the variance in a list [mu, sig]. 

# Insert code here

for i in range(len(measurements)):
    
    [mu, sig] = update(mu, sig, measurements[i], measurement_sig)
    print ('update',[mu, sig])
    [mu, sig] = predict(mu, sig, motion[i], motion_sig)
    print ('predict',[mu, sig])

print [mu, sig]

输出

update [4.998000799680128, 3.9984006397441023]
predict [5.998000799680128, 5.998400639744102]
update [5.999200191953932, 2.399744061425258]
predict [6.999200191953932, 4.399744061425258]
update [6.999619127420922, 2.0951800575117594]
predict [8.999619127420921, 4.09518005751176]
update [8.999811802788143, 2.0235152416216957]
predict [9.999811802788143, 4.023515241621696]
update [9.999906177177365, 2.0058615808441944]
predict [10.999906177177365, 4.005861580844194]

分析

可以很直观的看到,传感器对整体定位的逐步调整

mu = 0
sig = 0.0001

值得注意的一点是:
如上述代码
如果我们将初始方差调到很小:意味着我们非常确信最开始的定位是准确的。
我们同样会有对应的一组输出

update [1.2499968750078127e-05, 9.999975000062502e-06]
predict [1.00001249996875, 2.000009999975]
update [2.6666805554976856, 1.3333377777592592]
predict [3.6666805554976856, 3.333337777759259]
update [5.181826859462716, 1.8181831404895565]
predict [7.181826859462716, 3.8181831404895563]
update [8.06977203891725, 1.9534887182243577]
predict [9.06977203891725, 3.9534887182243574]
update [9.532166075020006, 1.9883041811151205]
predict [10.532166075020006, 3.9883041811151205]

但是随着一次次的更新,我们发现,传感器具有更高的可信度,于是最终结果也是非常接近我们预期效果位置11

高维高斯分布

先看高维情况的卡尔曼滤波
https://zhuanlan.zhihu.com/p/39912633

一图胜千言
二维4阶卡尔曼滤波实现
# Write a function 'kalman_filter' that implements a multi-
# dimensional Kalman Filter for the example given

from math import *


class matrix:

    # implements basic operations of a matrix class

    def __init__(self, value):
        self.value = value
        self.dimx = len(value)
        self.dimy = len(value[0])
        if value == [[]]:
            self.dimx = 0

    def zero(self, dimx, dimy):
        # check if valid dimensions
        if dimx < 1 or dimy < 1:
            raise ValueError("Invalid size of matrix")
        else:
            self.dimx = dimx
            self.dimy = dimy
            self.value = [[0 for row in range(dimy)] for col in range(dimx)]

    def identity(self, dim):
        # check if valid dimension
        if dim < 1:
            raise ValueError("Invalid size of matrix")
        else:
            self.dimx = dim
            self.dimy = dim
            self.value = [[0 for row in range(dim)] for col in range(dim)]
            for i in range(dim):
                self.value[i][i] = 1

    def show(self):
        for i in range(self.dimx):
            print(self.value[i])
        print(' ')

    def __add__(self, other):
        # check if correct dimensions
        if self.dimx != other.dimx or self.dimy != other.dimy:
            raise ValueError("Matrices must be of equal dimensions to add")
        else:
            # add if correct dimensions
            res = matrix([[]])
            res.zero(self.dimx, self.dimy)
            for i in range(self.dimx):
                for j in range(self.dimy):
                    res.value[i][j] = self.value[i][j] + other.value[i][j]
            return res

    def __sub__(self, other):
        # check if correct dimensions
        if self.dimx != other.dimx or self.dimy != other.dimy:
            raise ValueError("Matrices must be of equal dimensions to subtract")
        else:
            # subtract if correct dimensions
            res = matrix([[]])
            res.zero(self.dimx, self.dimy)
            for i in range(self.dimx):
                for j in range(self.dimy):
                    res.value[i][j] = self.value[i][j] - other.value[i][j]
            return res

    def __mul__(self, other):
        # check if correct dimensions
        if self.dimy != other.dimx:
            raise ValueError("Matrices must be m*n and n*p to multiply")
        else:
            # multiply if correct dimensions
            res = matrix([[]])
            res.zero(self.dimx, other.dimy)
            for i in range(self.dimx):
                for j in range(other.dimy):
                    for k in range(self.dimy):
                        res.value[i][j] += self.value[i][k] * other.value[k][j]
            return res

    def transpose(self):
        # compute transpose
        res = matrix([[]])
        res.zero(self.dimy, self.dimx)
        for i in range(self.dimx):
            for j in range(self.dimy):
                res.value[j][i] = self.value[i][j]
        return res

    # Thanks to Ernesto P. Adorio for use of Cholesky and CholeskyInverse functions

    def Cholesky(self, ztol=1.0e-5):
        # Computes the upper triangular Cholesky factorization of
        # a positive definite matrix.
        res = matrix([[]])
        res.zero(self.dimx, self.dimx)

        for i in range(self.dimx):
            S = sum([(res.value[k][i]) ** 2 for k in range(i)])
            d = self.value[i][i] - S
            if abs(d) < ztol:
                res.value[i][i] = 0.0
            else:
                if d < 0.0:
                    raise ValueError("Matrix not positive-definite")
                res.value[i][i] = sqrt(d)
            for j in range(i + 1, self.dimx):
                S = sum([res.value[k][i] * res.value[k][j] for k in range(self.dimx)])
                if abs(S) < ztol:
                    S = 0.0
                try:
                    res.value[i][j] = (self.value[i][j] - S) / res.value[i][i]
                except:
                    raise ValueError("Zero diagonal")
        return res

    def CholeskyInverse(self):
        # Computes inverse of matrix given its Cholesky upper Triangular
        # decomposition of matrix.
        res = matrix([[]])
        res.zero(self.dimx, self.dimx)

        # Backward step for inverse.
        for j in reversed(range(self.dimx)):
            tjj = self.value[j][j]
            S = sum([self.value[j][k] * res.value[j][k] for k in range(j + 1, self.dimx)])
            res.value[j][j] = 1.0 / tjj ** 2 - S / tjj
            for i in reversed(range(j)):
                res.value[j][i] = res.value[i][j] = -sum(
                    [self.value[i][k] * res.value[k][j] for k in range(i + 1, self.dimx)]) / self.value[i][i]
        return res

    def inverse(self):
        aux = self.Cholesky()
        res = aux.CholeskyInverse()
        return res

    def __repr__(self):
        return repr(self.value)


# z 是measurement
def update(x, P, z):
    y = z - H * x
    S = H * P * H.transpose() + R  # 加入波动的协方差
    K = P * H.transpose() * S.inverse()
    x = x + (K * y)
    P = (I - K * H) * P
    return x, P


def prediction(x, P):
    x = F * x + u
    P = F * P * F.transpose()
    return x, P


def kalman_filter(x, P):
    for n in range(len(measurements)):
        z = matrix([measurements[n]]).transpose()
        # 本例中是先预测后测量
        x, P = prediction(x, P)
        x, P = update(x, P, z)

    return x, P


# measurements = [1, 2, 3]  # 传感器的均值
# x = matrix([[0.], [0.]])  # initial state (location and velocity) 均值
# P = matrix([[1000., 0.], [0., 1000.]])  # initial uncertainty 协方差
# u = matrix([[0.], [0.]])  # external motion 其他控制建模,比如司机手刹
# F = matrix([[1., 1.], [0, 1.]])  # next state function 用于预测
# H = matrix([[1., 0.]])  # measurement function
# R = matrix([[1.]])  # measurement uncertainty 传感器的协方差
# I = matrix([[1., 0.], [0., 1.]])  # identity matrix


# measurements = [[5., 10.], [6., 8.], [7., 6.], [8., 4.], [9., 2.], [10., 0.]]
# initial_xy = [4., 12.]

# measurements = [[1., 4.], [6., 0.], [11., -4.], [16., -8.]]
# initial_xy = [-4., 8.]

measurements = [[1., 17.], [1., 15.], [1., 13.], [1., 11.]]
initial_xy = [1., 19.]

dt = 0.1

x = matrix([[initial_xy[0]], [initial_xy[1]], [0.], [0.]]) # initial state (location and velocity)
u = matrix([[0.], [0.], [0.], [0.]]) # external motion


P = matrix([[0., 0., 0., 0.],
            [0., 0., 0., 0.],
            [0., 0., 1000., 0.],
            [0., 0., 0., 1000.]]) # initial uncertainty: 0 for positions x and y, 1000 for the two velocities
F = matrix([[1., 0, dt, 0.],
            [0., 1, 0., dt],
            [0., 0, 1., 0.],
            [0., 0., 0., 1.]]) # next state function: generalize the 2d version to 4d
H = matrix([[1., 0., 0., 0.],
            [0.,1.,0.,0.]]) # measurement function: reflect the fact that we observe x and y but not the two velocities
R = matrix([[0.1, 0], [0, 0.1]]) # measurement uncertainty: use 2x2 matrix with 0.1 as main diagonal
I = matrix([[1., 0.,0.,0.], [0., 1.,0.,0.],[0.,0.,1.,0.],[0.,0.,0.,1.]])  # 4d identity matrix

a = kalman_filter(x, P)

a[0].show()
a[1].show()

核心为后面,前面是实现了一个矩阵的API便于使用,其实完全可以用numpy


本例中使用的卡尔曼滤波公式

其中 R矩阵 也就是传感器的误差(方差),在真实情况下,可以通过实验获得,或者最开始设置一个较大的数字,然后观看效果。如果不错的话,就保留

上一篇下一篇

猜你喜欢

热点阅读