Skip to the content.

Implementing the Kalman Filter in Python

This is Part 5 of an 8-part series on Kalman Filtering. Part 4 provided the complete mathematical derivation.

From Mathematics to Code

Having derived the Kalman filter equations, we now transform mathematical theory into practical Python code. This post will guide you through building a robust, well-documented implementation that handles real-world considerations.

The Basic Structure

Let’s start with a clean, object-oriented approach for a 1D position/velocity tracker:

import numpy as np
import matplotlib.pyplot as plt
from typing import Tuple, Optional

class KalmanFilter1D:
    """
    1D Kalman Filter for position/velocity tracking
    
    State vector: [position, velocity]
    Measurement: position only
    """

System Modeling

State Definition

For our example, we’ll track a moving object with:

  • State vector: x = [position, velocity]ᵀ
  • Measurement: position only
  • Dynamics: constant velocity with acceleration noise

The Constructor

def __init__(self, 
             dt: float = 1.0,
             process_noise: float = 0.1,
             measurement_noise: float = 1.0,
             initial_position: float = 0.0,
             initial_velocity: float = 0.0,
             initial_uncertainty: float = 1000.0):
    """
    Initialize the Kalman filter
    
    Args:
        dt: Time step
        process_noise: Process noise variance  
        measurement_noise: Measurement noise variance
        initial_position: Initial position estimate
        initial_velocity: Initial velocity estimate
        initial_uncertainty: Initial uncertainty
    """
    self.dt = dt
    
    # State vector [position, velocity]
    self.x = np.array([[initial_position], 
                      [initial_velocity]], dtype=float)

System Matrices

The key to any Kalman filter implementation is correctly defining the system matrices:

# State transition matrix (constant velocity model)
self.F = np.array([[1.0, dt],
                  [0.0, 1.0]], dtype=float)

# Measurement matrix (we only measure position)
self.H = np.array([[1.0, 0.0]], dtype=float)

# Process noise covariance matrix
self.Q = np.array([[dt**4/4, dt**3/2],
                  [dt**3/2, dt**2]], dtype=float) * process_noise

# Measurement noise covariance
self.R = np.array([[measurement_noise]], dtype=float)

# Error covariance matrix
self.P = np.eye(2) * initial_uncertainty

Understanding the Process Noise Matrix

The process noise matrix Q deserves special attention. For a constant velocity model with acceleration noise:

# Continuous-time acceleration noise gets discretized as:
self.Q = np.array([[dt**4/4, dt**3/2],
                  [dt**3/2, dt**2]], dtype=float) * process_noise

This comes from integrating white acceleration noise over the time interval dt.

The Prediction Step

def predict(self, control_input: Optional[float] = None) -> Tuple[np.ndarray, np.ndarray]:
    """
    Prediction step of Kalman filter
    
    Args:
        control_input: Optional control input (acceleration)
        
    Returns:
        Predicted state and covariance
    """
    # State prediction: x̂ₖ|ₖ₋₁ = Fₖx̂ₖ₋₁|ₖ₋₁ + Bₖuₖ
    self.x = self.F @ self.x
    
    # Add control input if provided
    if control_input is not None:
        B = np.array([[0.5 * self.dt**2], 
                     [self.dt]], dtype=float)
        self.x += B * control_input
    
    # Covariance prediction: Pₖ|ₖ₋₁ = FₖPₖ₋₁|ₖ₋₁Fₖᵀ + Qₖ
    self.P = self.F @ self.P @ self.F.T + self.Q
    
    return self.x.copy(), self.P.copy()

Key Implementation Details

  1. Matrix Multiplication: Using @ operator for clean matrix operations
  2. Control Input: Optional acceleration input with proper control matrix B
  3. Return Copies: Prevent external modification of internal state

The Update Step

def update(self, measurement: float) -> Tuple[np.ndarray, np.ndarray]:
    """
    Update step of Kalman filter
    
    Args:
        measurement: Position measurement
        
    Returns:
        Updated state and covariance
    """
    # Convert measurement to numpy array
    z = np.array([[measurement]], dtype=float)
    
    # Innovation (residual): ỹₖ = zₖ - Hₖx̂ₖ|ₖ₋₁
    y = z - self.H @ self.x
    
    # Innovation covariance: Sₖ = HₖPₖ|ₖ₋₁Hₖᵀ + Rₖ
    S = self.H @ self.P @ self.H.T + self.R
    
    # Kalman gain: Kₖ = Pₖ|ₖ₋₁Hₖᵀ Sₖ⁻¹
    K = self.P @ self.H.T @ np.linalg.inv(S)
    
    # State update: x̂ₖ|ₖ = x̂ₖ|ₖ₋₁ + Kₖỹₖ
    self.x = self.x + K @ y
    
    # Covariance update (Joseph form for numerical stability)
    I_KH = self.I - K @ self.H
    self.P = I_KH @ self.P @ I_KH.T + K @ self.R @ K.T
    
    return self.x.copy(), self.P.copy()

Numerical Stability: The Joseph Form

Notice we use the Joseph form for covariance update:

# Joseph form (numerically stable)
I_KH = self.I - K @ self.H
self.P = I_KH @ self.P @ I_KH.T + K @ self.R @ K.T

# Instead of standard form (can lose positive definiteness):
# self.P = (self.I - K @ self.H) @ self.P

The Joseph form guarantees positive definiteness even with numerical errors, making it essential for robust implementations.

Utility Methods

State Access

def get_state(self) -> dict:
    """Get current state estimates"""
    return {
        'position': self.x[0, 0],
        'velocity': self.x[1, 0],
        'position_uncertainty': np.sqrt(self.P[0, 0]),
        'velocity_uncertainty': np.sqrt(self.P[1, 1])
    }

Data Generation for Testing

def generate_true_trajectory(num_steps: int, dt: float, initial_pos: float = 0.0, 
                           velocity: float = 1.0, acceleration: float = 0.1) -> np.ndarray:
    """Generate true trajectory with constant acceleration"""
    t = np.arange(num_steps) * dt
    positions = initial_pos + velocity * t + 0.5 * acceleration * t**2
    return positions

def generate_measurements(true_positions: np.ndarray, noise_std: float = 1.0) -> np.ndarray:
    """Generate noisy measurements from true positions"""
    noise = np.random.normal(0, noise_std, len(true_positions))
    return true_positions + noise

Complete Demonstration

def demo_kalman_filter():
    """Demonstrate the Kalman filter with a simple example"""
    print("Kalman Filter Demonstration")
    print("=" * 40)
    
    # Simulation parameters
    num_steps = 50
    dt = 1.0
    true_velocity = 1.0
    true_acceleration = 0.1
    measurement_noise_std = 2.0
    
    # Generate true trajectory
    true_positions = generate_true_trajectory(num_steps, dt, 0.0, true_velocity, true_acceleration)
    true_velocities = np.gradient(true_positions, dt)
    
    # Generate noisy measurements
    measurements = generate_measurements(true_positions, measurement_noise_std)
    
    # Initialize Kalman filter
    kf = KalmanFilter1D(
        dt=dt,
        process_noise=0.1,
        measurement_noise=measurement_noise_std**2,
        initial_position=0.0,
        initial_velocity=0.0,
        initial_uncertainty=100.0
    )
    
    # Track estimates
    estimated_positions = []
    estimated_velocities = []
    position_uncertainties = []
    
    # Run filter
    for i in range(num_steps):
        # Predict step
        kf.predict()
        
        # Update step with measurement
        kf.update(measurements[i])
        
        # Store results
        state = kf.get_state()
        estimated_positions.append(state['position'])
        estimated_velocities.append(state['velocity'])
        position_uncertainties.append(state['position_uncertainty'])
    
    # Calculate errors
    position_errors = np.array(estimated_positions) - true_positions
    velocity_errors = np.array(estimated_velocities) - true_velocities
    
    print(f"Position RMSE: {np.sqrt(np.mean(position_errors**2)):.3f}")
    print(f"Velocity RMSE: {np.sqrt(np.mean(velocity_errors**2)):.3f}")

Advanced Implementation Considerations

1. Matrix Inversion Stability

For the innovation covariance inversion, consider:

# Standard approach (can be numerically unstable for ill-conditioned S)
K = self.P @ self.H.T @ np.linalg.inv(S)

# More stable approach using solve
K = self.P @ self.H.T @ np.linalg.solve(S, np.eye(S.shape[0]))

# Even better: use SVD or Cholesky decomposition for very ill-conditioned cases

2. Handling Missing Measurements

def update(self, measurement: Optional[float] = None):
    """Update step - skip if measurement is None"""
    if measurement is None:
        return self.x.copy(), self.P.copy()
    
    # Normal update logic...

3. Multi-Dimensional Extensions

For higher-dimensional systems:

class KalmanFilterND:
    def __init__(self, F, H, Q, R, initial_x, initial_P):
        """General N-dimensional Kalman filter"""
        self.F = np.array(F, dtype=float)  # State transition
        self.H = np.array(H, dtype=float)  # Measurement model
        self.Q = np.array(Q, dtype=float)  # Process noise
        self.R = np.array(R, dtype=float)  # Measurement noise
        self.x = np.array(initial_x, dtype=float).reshape(-1, 1)
        self.P = np.array(initial_P, dtype=float)

4. Parameter Validation

def __init__(self, ...):
    # Validate inputs
    assert dt > 0, "Time step must be positive"
    assert process_noise > 0, "Process noise must be positive"
    assert measurement_noise > 0, "Measurement noise must be positive"
    assert initial_uncertainty > 0, "Initial uncertainty must be positive"

Performance Optimization Tips

1. Pre-allocate Arrays

# For long runs, pre-allocate result arrays
estimated_positions = np.zeros(num_steps)
estimated_velocities = np.zeros(num_steps)

2. Avoid Repeated Calculations

# Cache commonly used values
self.F_T = self.F.T  # Transpose once, use many times
self.H_T = self.H.T

3. Use In-Place Operations When Possible

# In-place operations can be faster for large matrices
self.P += self.Q  # Instead of self.P = self.P + self.Q

Debugging and Validation

1. Innovation Monitoring

def check_innovation_consistency(self, innovations, S_history):
    """Check if innovations are properly normalized"""
    for i, (innov, S) in enumerate(zip(innovations, S_history)):
        # Normalized innovation should be ~ N(0,1)
        normalized = innov / np.sqrt(S)
        if abs(normalized) > 3:  # 3-sigma test
            print(f"Warning: Large innovation at step {i}")

2. Covariance Validation

def validate_covariance(self):
    """Ensure covariance matrix remains positive definite"""
    eigenvals = np.linalg.eigvals(self.P)
    if np.any(eigenvals <= 0):
        print("Warning: Covariance matrix not positive definite")

Key Takeaways

  1. Clean Architecture: Object-oriented design makes the filter reusable and maintainable

  2. Numerical Stability: Use Joseph form for covariance updates and stable matrix operations

  3. Type Safety: Use type hints and input validation for robust code

  4. Modular Design: Separate prediction, update, and utility functions

  5. Real-World Considerations: Handle missing measurements and numerical edge cases

  6. Testing Framework: Build comprehensive test cases with known ground truth

Looking Forward

With a solid implementation foundation, our next post will explore real-world applications where this theory and code come together to solve practical problems in navigation, robotics, computer vision, and beyond.

The journey from mathematical equations to working code reveals both the elegance of the underlying theory and the care required for production-ready implementations.

Continue to Part 6: Real-World Applications of Kalman Filtering


Complete Code

The complete implementation is available as a standalone Python script. To run it:

pip install numpy matplotlib
python kalman_filter_demo.py

Experiment with different parameters to see how they affect filter performance:

  • process_noise: How much the system can change unexpectedly
  • measurement_noise: How noisy your measurements are
  • initial_uncertainty: How confident you are in initial estimates

Keep Reading