pythonでカルマンフィルタを扱えるpykalmanというライブラリがあるんですが,使い方をよく忘れるので簡単にメモしておきます.
pykalmanには線形カルマンフィルタと非線形のUnscentedカルマンフィルタ(UKF)の実装が用意されていて,更にUKFには通常のUnscentedKalmanFilterクラスとAdditiveUnscentedKalmanFilterクラスがあります.AdditiveUnscentedKalmanFilterは通常のUKFより計算が軽くてロバストですが,ノイズが単純な加算でモデル化できる場合のみに使用できます.
どちらのUKFも基本的に 1) 状態遷移関数&観測関数を定義, 2) UKF初期化, 3) 観測を基にUKF更新, で状態推定を行うことができます.
# 1) definition of the transition and observation functions # calculate the next state def transition(state, noise): next_state = ... return next_state + noise # calculate the expected observation def observation(state, noise): expected_observation = ... return expected_observation + noise # 2) initialize UKF transition_cov = numpy.eye(N) # transition noise covariance matrix observation_cov = numpy.eye(M) # observation noise covariance matrix mean = numpy.zeros(N) # initial mean covs = numpy.eye(N) # initial covariance ukf = pykalman.UnscentedKalmanFilter(transition, observation, transition_cov, observation_cov, mean, cov) # 3) update mean and cov with observations while True: observation = ... mean, cov = ukf.filter_update(mean, cov, observation)
前述したように今回の例のようにノイズが状態,観測に加算されるだけの場合にはAdditiveUnscentedKalmanFilterを使ったほうが,計算量・安定性的に良いです.AdditiveUnscentedKalmanFilterの場合にはノイズは加算されるだけと決まっているので,状態遷移・観測関数には現状態ベクトルのみ渡されて,pykalman側で勝手にノイズを足してくれます.下のような感じ.
def transition(state): next_state = ... return next_state def observation(state): expected_observation = ... return expected_observation ukf = pykalman.AdditiveUnscentedKalmanFilter(...)
デモとして,前にも作ってた距離センサを使った位置推定プログラムをpykalmanで書き直しました.github
#!/usr/bin/python import cv2 import math import numpy import pykalman # System to be estimated. We estimate a target position from observations taken from several range sensors. # target position : p = [p.x, p.y] # sensor positions: s0 = [s0.x, s0.y], ..., sn = [sn.x, sn.y] # state space : x = p # observation : y = [|p - s0|, |p - s1|, ..., |p - sn|] class System: # constructor def __init__(self): # true target and sensor positions self.target = numpy.float32((256, 256)) self.sensors = numpy.float32([(128, 128), (384, 384), (384, 128)]) cv2.namedWindow('canvas') cv2.setMouseCallback('canvas', self.mouse_callback) # generates an observation vector # [|p - s0|, |p - s1|, ..., |p - sn|] + noise def measurement(self): dists = numpy.linalg.norm(self.sensors - self.target, axis=1) return dists + numpy.abs(numpy.random.normal(0.0, 25.0, dists.size)) # updates the target position def mouse_callback(self, event, x, y, flags, userdata): if event == cv2.EVENT_LBUTTONDOWN: self.pushed = True elif event == cv2.EVENT_LBUTTONUP: self.pushed = False if hasattr(self, 'pushed') and self.pushed: self.target = numpy.float32((x, y)) # draw def draw(self, filter, measurement): canvas = numpy.ones((512, 512, 3), dtype=numpy.uint8) * 255 # draw sensors and observations for r, sensor in zip(measurement, self.sensors): cv2.circle(canvas, tuple(sensor.astype(numpy.int32)), 5, (0, 255, 0), 2) cv2.circle(canvas, tuple(sensor.astype(numpy.int32)), int(r), (128, 128, 128), 1) # draw true target position cv2.circle(canvas, tuple(self.target.astype(numpy.int32)), 2, (255, 0, 0), 2) # draw estimated target position and error ellipse cv2.circle(canvas, tuple(filter.mean.astype(numpy.int32)), 5, (0, 0, 255), 2) eigenvalues, eigenvectors = numpy.linalg.eig(filter.cov) idx = [x[0] for x in sorted(enumerate(eigenvalues), key=lambda x: x[1])] angle = math.degrees(math.atan2(eigenvectors[0, idx[1]], eigenvectors[1, idx[1]])) axes = (int(eigenvalues[idx[1]]), int(eigenvalues[idx[0]])) cv2.ellipse(canvas, tuple(filter.mean.astype(numpy.int32)), axes, angle, 0, 360, (0, 0, 255)) cv2.imshow('canvas', canvas) # Target position estimator using UKF class Filter: def __init__(self, sensors): self.sensors = sensors # initialize ukf trans_cov = numpy.eye(2) * 20 obs_cov = numpy.eye(sensors.shape[0]) * 100 self.mean = numpy.random.normal(256, 256, 2) self.cov = numpy.eye(2) * 128 # self.ukf = pykalman.UnscentedKalmanFilter(self.transition, self.observation, trans_cov, obs_cov, self.mean, self.cov) self.ukf = pykalman.AdditiveUnscentedKalmanFilter(self.transition_, self.observation_, trans_cov, obs_cov, self.mean, self.cov) # transition function for usual UKF def transition(self, state, noise): return state + noise # observation function for usual UKF def observation(self, state, noise): expected = numpy.linalg.norm(self.sensors - state, axis=-1) return expected + noise # transition function for Additive UKF def transition_(self, state): return state # observation function for Additive UKF def observation_(self, state): expected = numpy.linalg.norm(self.sensors - state, axis=-1) return expected # update state using ukf def update(self, measurement): self.mean, self.cov = self.ukf.filter_update(self.mean, self.cov, measurement) # entry point def main(): system = System() filter = Filter(system.sensors) while cv2.waitKey(50) != 0x1b: obs = system.measurement() filter.update(obs) system.draw(filter, obs) if __name__ == '__main__': main()
pykalmanですが,通常の線形カルマンフィルタに関してもパラメータ推定など,いい感じの機能が揃っているので使い勝手が良いです.それから観測データが全て揃ってからのオフライン推定についてもカルマンスムーサーをKF, UKFどちらにも提供しているのでそちらについてもそのうち書きたいと思います.