Schema Evolution and Validation for Streaming Events
JSONschema evolutiontype coercionpydantic
Scenario: You are building a streaming pipeline that ingests user event data from many microservices. The data comes in as JSON, like this:
1
2
3
4
{"user_id": 101, "event_type": "login", "timestamp": "2025-10-14T12:00:00Z"}
{"user_id": 102, "event_type": "purchase", "amount": 59.99, "timestamp": "2025-10-14T12:02:15Z"}
{"user_id": "103", "event_type": "logout", "timestamp": "2025-10-14T12:05:20Z"}
{"event_type": "login", "timestamp": "2025-10-14T12:07:00Z"}
Because these events come from different teams and versions, they are often incomplete, inconsistent, or change over time:
user_idmight be a string or intamountmay appear only for purchase events- Some fields may be missing
- Future versions may add new fields (like
deviceorlocation)
Your goal: validate and normalize these events before they go to a downstream system (BigQuery, Kafka, Pub/Sub).
Task:
Write a Python program that:
- Reads a JSON lines file (
events.jsonl) line by line. Treat it as streaming input. - Validates and normalizes each event against this expected schema:
| Field | Type | Required | Notes |
|---|---|---|---|
| user_id | int | Yes | Convert to int if possible. Skip event if missing or invalid. |
| event_type | str | Yes | Must be one of "login", "logout", "purchase" |
| timestamp | str | Yes | Must be valid ISO8601 |
| amount | float | No | Only required for "purchase" events. Default to 0.0 if missing. |
| device | str | No | Optional new field. Can be present or not. |
- Writes valid normalized events to
cleaned_events.jsonl. - Writes invalid events to
invalid_events.jsonlwith an extra"error_reason"field describing why they failed.
Example Output (cleaned_events.jsonl):
1
2
3
{"user_id": 101, "event_type": "login", "timestamp": "2025-10-14T12:00:00Z", "amount": 0.0}
{"user_id": 102, "event_type": "purchase", "timestamp": "2025-10-14T12:02:15Z", "amount": 59.99}
{"user_id": 103, "event_type": "logout", "timestamp": "2025-10-14T12:05:20Z", "amount": 0.0}
Example Output (invalid_events.jsonl):
1
2
{"event_type": "login", "timestamp": "2025-10-14T12:07:00Z", "error_reason": "missing user_id"}
{"user_id": "abc", "event_type": "purchase", "timestamp": "2025-10-14T12:09:00Z", "amount": "NaN", "error_reason": "user_id not convertible to int"}
Bonus Challenges (Highly Recommended):
- Make your schema evolution-proof. Ignore unknown fields instead of failing.
- Keep counters: total events processed, valid, invalid.
- Use
pydanticfor validation (optional, very handy).
Hints:
- Use
json.loads()for line-by-line streaming. - Use
try/exceptfor type conversions. - Keep validation and normalization in one clean function.
- Think about how you would handle new fields without code changes. That is the heart of schema evolution.
Try the problem on your own first. Solutions are most valuable after you've struggled with it.
Reference implementation — solution.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
#!/usr/bin/env python3
"""
Validate and Normalize Streaming User Events (Schema Evolution Ready)
Author: Amirul Islam
Description:
Streams JSON events line-by-line, validates and normalizes them
against an evolving schema, and separates valid and invalid events.
Complexity:
Time: O(n), one pass over events.
Space: O(1), processes line by line.
"""
import json
from datetime import datetime
from typing import Any, Dict, Iterator
RAW_EVENTS_FILE_PATH = "../../data/events.jsonl"
CLEANED_EVENTS_FILE_PATH = "../../data/cleaned_events.jsonl"
INVALID_EVENTS_FILE_PATH = "../../data/invalid_events.jsonl"
VALID_EVENT_TYPES = {"login", "logout", "purchase"}
def read_json_lines(file_path: str) -> Iterator[Dict[str, Any]]:
"""Stream JSON objects line by line (memory-safe)."""
with open(file_path, "r", encoding="utf-8") as file:
for line_number, line in enumerate(file, 1):
line = line.strip()
if not line:
continue
try:
yield json.loads(line)
except json.JSONDecodeError:
yield {
"_raw": line,
"error_reason": f"invalid_json_at_line_{line_number}",
}
def is_valid_iso8601(ts: str) -> bool:
"""Check if timestamp is ISO8601-formatted."""
try:
datetime.fromisoformat(ts.replace("Z", "+00:00"))
return True
except Exception:
return False
def normalize_event(event: Dict[str, Any]) -> Dict[str, Any] | None:
"""
Validate and normalize a single event.
Returns normalized dict if valid, or None if invalid.
Adds 'error_reason' if invalid.
"""
# --- Validate user_id ---
if "user_id" not in event:
event["error_reason"] = "missing user_id"
return None
try:
user_id = int(event["user_id"])
except (ValueError, TypeError):
event["error_reason"] = "user_id not convertible to int"
return None
# --- Validate event_type ---
event_type = str(event.get("event_type", "")).lower()
if event_type not in VALID_EVENT_TYPES:
event["error_reason"] = f"invalid event_type '{event_type}'"
return None
# --- Validate timestamp ---
timestamp = event.get("timestamp")
if not timestamp or not is_valid_iso8601(timestamp):
event["error_reason"] = "invalid or missing timestamp"
return None
# --- Normalize amount ---
amount = 0.0
if event_type == "purchase":
try:
amount = float(event.get("amount", 0.0))
except (ValueError, TypeError):
event["error_reason"] = "invalid amount for purchase"
return None
# --- Optional fields (schema evolution safe) ---
device = event.get("device") if isinstance(event.get("device"), str) else None
# --- Build normalized record ---
normalized = {
"user_id": user_id,
"event_type": event_type,
"timestamp": timestamp,
"amount": round(amount, 2),
}
# keep optional field if present
if device:
normalized["device"] = device
return normalized
def main():
total = valid = invalid = 0
with (
open(CLEANED_EVENTS_FILE_PATH, "w", encoding="utf-8") as clean_f,
open(INVALID_EVENTS_FILE_PATH, "w", encoding="utf-8") as invalid_f,
):
for event in read_json_lines(RAW_EVENTS_FILE_PATH):
total += 1
# Handle decode errors specially
if "_raw" in event:
invalid += 1
invalid_f.write(json.dumps(event, ensure_ascii=False) + "\n")
continue
normalized = normalize_event(event)
if normalized:
valid += 1
clean_f.write(json.dumps(normalized, ensure_ascii=False) + "\n")
else:
invalid += 1
invalid_f.write(json.dumps(event, ensure_ascii=False) + "\n")
print(
f"Validation complete. "
f"Processed {total} events: {valid} valid, {invalid} invalid."
)
if __name__ == "__main__":
main()