Rolling Average of Sensor Readings
rolling windowdequeIoT sensorsreal-time
Scenario: You are building a data pipeline for IoT sensors (like BESS, PV inverters, weather stations). Each sensor sends temperature data every few seconds in this format:
1
2
3
4
5
6
2025-10-11T13:45:20Z sensor_1 25.4
2025-10-11T13:45:25Z sensor_1 26.1
2025-10-11T13:45:30Z sensor_2 22.8
2025-10-11T13:45:35Z sensor_1 27.0
2025-10-11T13:45:40Z sensor_2 23.4
...
Each line has:
- timestamp (ISO8601)
- sensor_id
- temperature (float, degrees C)
The stream never ends. Input could be a file, MQTT, or Kafka, but treat it as continuous input.
Task:
Write a Python program that:
- Continuously reads incoming sensor data, line by line.
- For each sensor, keeps a rolling average temperature over the last 3 readings.
- Each time a new reading arrives, prints:
1
<timestamp> <sensor_id> <rolling_average>
Example:
1
2025-10-11T13:45:35Z sensor_1 26.17
Bonus Challenges:
- Make it memory efficient. Don’t store all history.
- Handle many sensors dynamically.
- Handle malformed lines without crashing.
- Make the code easy to extend to a real streaming consumer (Kafka, MQTT).
Tips:
- Think about how to store only the last 3 values per sensor.
- A
dequeworks well here, or a list with slicing. - Aim for clean, production-like structure, not just working code.
Try the problem on your own first. Solutions are most valuable after you've struggled with it.
Reference implementation — solution.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
#!/usr/bin/env python3
"""
Rolling Average Temperature Processor
Author: Amirul Islam
Description:
Continuously reads sensor readings from a stream and prints
a rolling average over the last 3 readings for each sensor.
"""
import sys
from collections import defaultdict, deque
from typing import Deque, Dict
def compute_rolling_average(file_stream=sys.stdin) -> None:
"""
Reads sensor data from a stream and prints rolling averages
of the last 3 readings per sensor.
Expected input line format:
<timestamp> <sensor_id> <temperature>
"""
# Use maxlen=3 to automatically maintain a rolling window
sensor_readings: Dict[str, Deque[float]] = defaultdict(lambda: deque(maxlen=3))
for line_num, line in enumerate(file_stream, 1):
parts = line.strip().split()
if len(parts) != 3:
# skip malformed lines
continue
timestamp, sensor_id, temp_str = parts
try:
temperature = float(temp_str)
except ValueError:
# skip bad temperature values
continue
# Update rolling window for the sensor
sensor_readings[sensor_id].append(temperature)
# Only print average when we have 3 readings
if len(sensor_readings[sensor_id]) == 3:
avg_temp = sum(sensor_readings[sensor_id]) / 3
print(f"{timestamp} {sensor_id} {avg_temp:.2f}")
def main():
print("Reading sensor data... (Ctrl+C to stop)")
try:
compute_rolling_average()
except KeyboardInterrupt:
print("\nStopping stream. Goodbye!")
if __name__ == "__main__":
main()