pythonでカルマンフィルタを扱えるpykalmanというライブラリがあるんですが,使い方をよく忘れるので簡単にメモしておきます.
pykalmanには線形カルマンフィルタと非線形のUnscentedカルマンフィルタ(UKF)の実装が用意されていて,更にUKFには通常のUnscentedKalmanFilterクラスとAdditiveUnscentedKalmanFilterクラスがあります.AdditiveUnscentedKalmanFilterは通常のUKFより計算が軽くてロバストですが,ノイズが単純な加算でモデル化できる場合のみに使用できます.
どちらのUKFも基本的に 1) 状態遷移関数&観測関数を定義, 2) UKF初期化, 3) 観測を基にUKF更新, で状態推定を行うことができます.
# 1) definition of the transition and observation functions # calculate the next state def transition(state, noise): next_state = ... return next_state + noise # calculate the expected observation def observation(state, noise): expected_observation = ... return expected_observation + noise # 2) initialize UKF transition_cov = numpy.eye(N) # transition noise covariance matrix observation_cov = numpy.eye(M) # observation noise covariance matrix mean = numpy.zeros(N) # initial mean covs = numpy.eye(N) # initial covariance ukf = pykalman.UnscentedKalmanFilter(transition, observation, transition_cov, observation_cov, mean, cov) # 3) update mean and cov with observations while True: observation = ... mean, cov = ukf.filter_update(mean, cov, observation)
前述したように今回の例のようにノイズが状態,観測に加算されるだけの場合にはAdditiveUnscentedKalmanFilterを使ったほうが,計算量・安定性的に良いです.AdditiveUnscentedKalmanFilterの場合にはノイズは加算されるだけと決まっているので,状態遷移・観測関数には現状態ベクトルのみ渡されて,pykalman側で勝手にノイズを足してくれます.下のような感じ.
def transition(state): next_state = ... return next_state def observation(state): expected_observation = ... return expected_observation ukf = pykalman.AdditiveUnscentedKalmanFilter(...)
デモとして,前にも作ってた距離センサを使った位置推定プログラムをpykalmanで書き直しました.github
#!/usr/bin/python
import cv2
import math
import numpy
import pykalman
# System to be estimated. We estimate a target position from observations taken from several range sensors.
# target position : p = [p.x, p.y]
# sensor positions: s0 = [s0.x, s0.y], ..., sn = [sn.x, sn.y]
# state space : x = p
# observation : y = [|p - s0|, |p - s1|, ..., |p - sn|]
class System:
# constructor
def __init__(self):
# true target and sensor positions
self.target = numpy.float32((256, 256))
self.sensors = numpy.float32([(128, 128), (384, 384), (384, 128)])
cv2.namedWindow('canvas')
cv2.setMouseCallback('canvas', self.mouse_callback)
# generates an observation vector
# [|p - s0|, |p - s1|, ..., |p - sn|] + noise
def measurement(self):
dists = numpy.linalg.norm(self.sensors - self.target, axis=1)
return dists + numpy.abs(numpy.random.normal(0.0, 25.0, dists.size))
# updates the target position
def mouse_callback(self, event, x, y, flags, userdata):
if event == cv2.EVENT_LBUTTONDOWN:
self.pushed = True
elif event == cv2.EVENT_LBUTTONUP:
self.pushed = False
if hasattr(self, 'pushed') and self.pushed:
self.target = numpy.float32((x, y))
# draw
def draw(self, filter, measurement):
canvas = numpy.ones((512, 512, 3), dtype=numpy.uint8) * 255
# draw sensors and observations
for r, sensor in zip(measurement, self.sensors):
cv2.circle(canvas, tuple(sensor.astype(numpy.int32)), 5, (0, 255, 0), 2)
cv2.circle(canvas, tuple(sensor.astype(numpy.int32)), int(r), (128, 128, 128), 1)
# draw true target position
cv2.circle(canvas, tuple(self.target.astype(numpy.int32)), 2, (255, 0, 0), 2)
# draw estimated target position and error ellipse
cv2.circle(canvas, tuple(filter.mean.astype(numpy.int32)), 5, (0, 0, 255), 2)
eigenvalues, eigenvectors = numpy.linalg.eig(filter.cov)
idx = [x[0] for x in sorted(enumerate(eigenvalues), key=lambda x: x[1])]
angle = math.degrees(math.atan2(eigenvectors[0, idx[1]], eigenvectors[1, idx[1]]))
axes = (int(eigenvalues[idx[1]]), int(eigenvalues[idx[0]]))
cv2.ellipse(canvas, tuple(filter.mean.astype(numpy.int32)), axes, angle, 0, 360, (0, 0, 255))
cv2.imshow('canvas', canvas)
# Target position estimator using UKF
class Filter:
def __init__(self, sensors):
self.sensors = sensors
# initialize ukf
trans_cov = numpy.eye(2) * 20
obs_cov = numpy.eye(sensors.shape[0]) * 100
self.mean = numpy.random.normal(256, 256, 2)
self.cov = numpy.eye(2) * 128
# self.ukf = pykalman.UnscentedKalmanFilter(self.transition, self.observation, trans_cov, obs_cov, self.mean, self.cov)
self.ukf = pykalman.AdditiveUnscentedKalmanFilter(self.transition_, self.observation_, trans_cov, obs_cov, self.mean, self.cov)
# transition function for usual UKF
def transition(self, state, noise):
return state + noise
# observation function for usual UKF
def observation(self, state, noise):
expected = numpy.linalg.norm(self.sensors - state, axis=-1)
return expected + noise
# transition function for Additive UKF
def transition_(self, state):
return state
# observation function for Additive UKF
def observation_(self, state):
expected = numpy.linalg.norm(self.sensors - state, axis=-1)
return expected
# update state using ukf
def update(self, measurement):
self.mean, self.cov = self.ukf.filter_update(self.mean, self.cov, measurement)
# entry point
def main():
system = System()
filter = Filter(system.sensors)
while cv2.waitKey(50) != 0x1b:
obs = system.measurement()
filter.update(obs)
system.draw(filter, obs)
if __name__ == '__main__':
main()
pykalmanですが,通常の線形カルマンフィルタに関してもパラメータ推定など,いい感じの機能が揃っているので使い勝手が良いです.それから観測データが全て揃ってからのオフライン推定についてもカルマンスムーサーをKF, UKFどちらにも提供しているのでそちらについてもそのうち書きたいと思います.
