Implementing the Kalman Filter in Python
This is Part 5 of an 8-part series on Kalman Filtering. Part 4 provided the complete mathematical derivation.
From Mathematics to Code
Having derived the Kalman filter equations, we now transform mathematical theory into practical Python code. This post will guide you through building a robust, well-documented implementation that handles real-world considerations.
The Basic Structure
Let’s start with a clean, object-oriented approach for a 1D position/velocity tracker:
import numpy as np
import matplotlib.pyplot as plt
from typing import Tuple, Optional
class KalmanFilter1D:
"""
1D Kalman Filter for position/velocity tracking
State vector: [position, velocity]
Measurement: position only
"""
System Modeling
State Definition
For our example, we’ll track a moving object with:
- State vector:
x = [position, velocity]ᵀ
- Measurement: position only
- Dynamics: constant velocity with acceleration noise
The Constructor
def __init__(self,
dt: float = 1.0,
process_noise: float = 0.1,
measurement_noise: float = 1.0,
initial_position: float = 0.0,
initial_velocity: float = 0.0,
initial_uncertainty: float = 1000.0):
"""
Initialize the Kalman filter
Args:
dt: Time step
process_noise: Process noise variance
measurement_noise: Measurement noise variance
initial_position: Initial position estimate
initial_velocity: Initial velocity estimate
initial_uncertainty: Initial uncertainty
"""
self.dt = dt
# State vector [position, velocity]
self.x = np.array([[initial_position],
[initial_velocity]], dtype=float)
System Matrices
The key to any Kalman filter implementation is correctly defining the system matrices:
# State transition matrix (constant velocity model)
self.F = np.array([[1.0, dt],
[0.0, 1.0]], dtype=float)
# Measurement matrix (we only measure position)
self.H = np.array([[1.0, 0.0]], dtype=float)
# Process noise covariance matrix
self.Q = np.array([[dt**4/4, dt**3/2],
[dt**3/2, dt**2]], dtype=float) * process_noise
# Measurement noise covariance
self.R = np.array([[measurement_noise]], dtype=float)
# Error covariance matrix
self.P = np.eye(2) * initial_uncertainty
Understanding the Process Noise Matrix
The process noise matrix Q
deserves special attention. For a constant velocity model with acceleration noise:
# Continuous-time acceleration noise gets discretized as:
self.Q = np.array([[dt**4/4, dt**3/2],
[dt**3/2, dt**2]], dtype=float) * process_noise
This comes from integrating white acceleration noise over the time interval dt
.
The Prediction Step
def predict(self, control_input: Optional[float] = None) -> Tuple[np.ndarray, np.ndarray]:
"""
Prediction step of Kalman filter
Args:
control_input: Optional control input (acceleration)
Returns:
Predicted state and covariance
"""
# State prediction: x̂ₖ|ₖ₋₁ = Fₖx̂ₖ₋₁|ₖ₋₁ + Bₖuₖ
self.x = self.F @ self.x
# Add control input if provided
if control_input is not None:
B = np.array([[0.5 * self.dt**2],
[self.dt]], dtype=float)
self.x += B * control_input
# Covariance prediction: Pₖ|ₖ₋₁ = FₖPₖ₋₁|ₖ₋₁Fₖᵀ + Qₖ
self.P = self.F @ self.P @ self.F.T + self.Q
return self.x.copy(), self.P.copy()
Key Implementation Details
- Matrix Multiplication: Using
@
operator for clean matrix operations - Control Input: Optional acceleration input with proper control matrix
B
- Return Copies: Prevent external modification of internal state
The Update Step
def update(self, measurement: float) -> Tuple[np.ndarray, np.ndarray]:
"""
Update step of Kalman filter
Args:
measurement: Position measurement
Returns:
Updated state and covariance
"""
# Convert measurement to numpy array
z = np.array([[measurement]], dtype=float)
# Innovation (residual): ỹₖ = zₖ - Hₖx̂ₖ|ₖ₋₁
y = z - self.H @ self.x
# Innovation covariance: Sₖ = HₖPₖ|ₖ₋₁Hₖᵀ + Rₖ
S = self.H @ self.P @ self.H.T + self.R
# Kalman gain: Kₖ = Pₖ|ₖ₋₁Hₖᵀ Sₖ⁻¹
K = self.P @ self.H.T @ np.linalg.inv(S)
# State update: x̂ₖ|ₖ = x̂ₖ|ₖ₋₁ + Kₖỹₖ
self.x = self.x + K @ y
# Covariance update (Joseph form for numerical stability)
I_KH = self.I - K @ self.H
self.P = I_KH @ self.P @ I_KH.T + K @ self.R @ K.T
return self.x.copy(), self.P.copy()
Numerical Stability: The Joseph Form
Notice we use the Joseph form for covariance update:
# Joseph form (numerically stable)
I_KH = self.I - K @ self.H
self.P = I_KH @ self.P @ I_KH.T + K @ self.R @ K.T
# Instead of standard form (can lose positive definiteness):
# self.P = (self.I - K @ self.H) @ self.P
The Joseph form guarantees positive definiteness even with numerical errors, making it essential for robust implementations.
Utility Methods
State Access
def get_state(self) -> dict:
"""Get current state estimates"""
return {
'position': self.x[0, 0],
'velocity': self.x[1, 0],
'position_uncertainty': np.sqrt(self.P[0, 0]),
'velocity_uncertainty': np.sqrt(self.P[1, 1])
}
Data Generation for Testing
def generate_true_trajectory(num_steps: int, dt: float, initial_pos: float = 0.0,
velocity: float = 1.0, acceleration: float = 0.1) -> np.ndarray:
"""Generate true trajectory with constant acceleration"""
t = np.arange(num_steps) * dt
positions = initial_pos + velocity * t + 0.5 * acceleration * t**2
return positions
def generate_measurements(true_positions: np.ndarray, noise_std: float = 1.0) -> np.ndarray:
"""Generate noisy measurements from true positions"""
noise = np.random.normal(0, noise_std, len(true_positions))
return true_positions + noise
Complete Demonstration
def demo_kalman_filter():
"""Demonstrate the Kalman filter with a simple example"""
print("Kalman Filter Demonstration")
print("=" * 40)
# Simulation parameters
num_steps = 50
dt = 1.0
true_velocity = 1.0
true_acceleration = 0.1
measurement_noise_std = 2.0
# Generate true trajectory
true_positions = generate_true_trajectory(num_steps, dt, 0.0, true_velocity, true_acceleration)
true_velocities = np.gradient(true_positions, dt)
# Generate noisy measurements
measurements = generate_measurements(true_positions, measurement_noise_std)
# Initialize Kalman filter
kf = KalmanFilter1D(
dt=dt,
process_noise=0.1,
measurement_noise=measurement_noise_std**2,
initial_position=0.0,
initial_velocity=0.0,
initial_uncertainty=100.0
)
# Track estimates
estimated_positions = []
estimated_velocities = []
position_uncertainties = []
# Run filter
for i in range(num_steps):
# Predict step
kf.predict()
# Update step with measurement
kf.update(measurements[i])
# Store results
state = kf.get_state()
estimated_positions.append(state['position'])
estimated_velocities.append(state['velocity'])
position_uncertainties.append(state['position_uncertainty'])
# Calculate errors
position_errors = np.array(estimated_positions) - true_positions
velocity_errors = np.array(estimated_velocities) - true_velocities
print(f"Position RMSE: {np.sqrt(np.mean(position_errors**2)):.3f}")
print(f"Velocity RMSE: {np.sqrt(np.mean(velocity_errors**2)):.3f}")
Advanced Implementation Considerations
1. Matrix Inversion Stability
For the innovation covariance inversion, consider:
# Standard approach (can be numerically unstable for ill-conditioned S)
K = self.P @ self.H.T @ np.linalg.inv(S)
# More stable approach using solve
K = self.P @ self.H.T @ np.linalg.solve(S, np.eye(S.shape[0]))
# Even better: use SVD or Cholesky decomposition for very ill-conditioned cases
2. Handling Missing Measurements
def update(self, measurement: Optional[float] = None):
"""Update step - skip if measurement is None"""
if measurement is None:
return self.x.copy(), self.P.copy()
# Normal update logic...
3. Multi-Dimensional Extensions
For higher-dimensional systems:
class KalmanFilterND:
def __init__(self, F, H, Q, R, initial_x, initial_P):
"""General N-dimensional Kalman filter"""
self.F = np.array(F, dtype=float) # State transition
self.H = np.array(H, dtype=float) # Measurement model
self.Q = np.array(Q, dtype=float) # Process noise
self.R = np.array(R, dtype=float) # Measurement noise
self.x = np.array(initial_x, dtype=float).reshape(-1, 1)
self.P = np.array(initial_P, dtype=float)
4. Parameter Validation
def __init__(self, ...):
# Validate inputs
assert dt > 0, "Time step must be positive"
assert process_noise > 0, "Process noise must be positive"
assert measurement_noise > 0, "Measurement noise must be positive"
assert initial_uncertainty > 0, "Initial uncertainty must be positive"
Performance Optimization Tips
1. Pre-allocate Arrays
# For long runs, pre-allocate result arrays
estimated_positions = np.zeros(num_steps)
estimated_velocities = np.zeros(num_steps)
2. Avoid Repeated Calculations
# Cache commonly used values
self.F_T = self.F.T # Transpose once, use many times
self.H_T = self.H.T
3. Use In-Place Operations When Possible
# In-place operations can be faster for large matrices
self.P += self.Q # Instead of self.P = self.P + self.Q
Debugging and Validation
1. Innovation Monitoring
def check_innovation_consistency(self, innovations, S_history):
"""Check if innovations are properly normalized"""
for i, (innov, S) in enumerate(zip(innovations, S_history)):
# Normalized innovation should be ~ N(0,1)
normalized = innov / np.sqrt(S)
if abs(normalized) > 3: # 3-sigma test
print(f"Warning: Large innovation at step {i}")
2. Covariance Validation
def validate_covariance(self):
"""Ensure covariance matrix remains positive definite"""
eigenvals = np.linalg.eigvals(self.P)
if np.any(eigenvals <= 0):
print("Warning: Covariance matrix not positive definite")
Key Takeaways
-
Clean Architecture: Object-oriented design makes the filter reusable and maintainable
-
Numerical Stability: Use Joseph form for covariance updates and stable matrix operations
-
Type Safety: Use type hints and input validation for robust code
-
Modular Design: Separate prediction, update, and utility functions
-
Real-World Considerations: Handle missing measurements and numerical edge cases
-
Testing Framework: Build comprehensive test cases with known ground truth
Looking Forward
With a solid implementation foundation, our next post will explore real-world applications where this theory and code come together to solve practical problems in navigation, robotics, computer vision, and beyond.
The journey from mathematical equations to working code reveals both the elegance of the underlying theory and the care required for production-ready implementations.
Continue to Part 6: Real-World Applications of Kalman Filtering
Complete Code
The complete implementation is available as a standalone Python script. To run it:
pip install numpy matplotlib
python kalman_filter_demo.py
Experiment with different parameters to see how they affect filter performance:
process_noise
: How much the system can change unexpectedlymeasurement_noise
: How noisy your measurements areinitial_uncertainty
: How confident you are in initial estimates