Troubleshooting Guide¶
Quick Reference¶
| Symptom | Likely Cause | Solution |
|---|---|---|
| MR pipeline fails | Syntax or schema error | Check validation logs |
| Data not in prod topic | Quality check failed | Check DLQ |
| High DLQ count | Source data issues | Run DLQ analysis |
| Schema Registry error | Incompatible schema | Check compatibility mode |
| Consumer can't deserialize | Schema version mismatch | Update consumer |
Issue: CI/CD Pipeline Failures¶
Symptom¶
MR pipeline fails at validation stage.
Diagnosis¶
# 1. Check which job failed
# Look at GitLab CI pipeline view
# 2. Get job logs
# Click on failed job → View logs
# 3. Run validation locally
python ci/validate_contract.py domains/{domain}/{entity}/contract.yaml
Common Causes & Solutions¶
Invalid YAML Syntax¶
Missing Required Fields¶
# ❌ Missing owner
metadata:
name: "orders"
namespace: "sales"
# ✅ Correct
metadata:
name: "orders"
namespace: "sales"
owner:
team: "sales-team"
email: "sales@company.ru"
Invalid Type¶
# ❌ Wrong type
- name: amount
type: money # Not a valid type
# ✅ Correct
- name: amount
type: decimal
Prevention¶
# Install pre-commit hook
cp ci/hooks/pre-commit .git/hooks/
chmod +x .git/hooks/pre-commit
# Now validation runs before each commit
Issue: Data Not Appearing in Prod Topic¶
Symptom¶
Producer sends data, but nothing appears in {namespace}.{entity}_prod.
Diagnosis Flowchart¶
┌─────────────────────────────────────────────────────────────────┐
│ Data not in prod topic │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ Step 1: Check if data is in raw topic │
│ kafkacat -b kafka:9092 -t {ns}.{entity}.raw -C -c 5 -o end │
└─────────────────────────────────────────────────────────────────┘
│
┌─────────────┴─────────────┐
│ │
▼ ▼
┌──────────┐ ┌──────────┐
│ Data OK │ │ No Data │
└────┬─────┘ └────┬─────┘
│ │
▼ ▼
┌─────────────────────────┐ ┌─────────────────────────┐
│ Step 2: Check DLQ │ │ Problem is upstream │
│ kafkacat ... _dlq ... │ │ - Check producer logs │
└────────────┬────────────┘ │ - Check API Gateway │
│ │ - Check connectivity │
▼ └─────────────────────────┘
┌─────────────────────────┐
│ Data in DLQ? │
│ Yes → Quality issue │
│ No → Validator issue │
└─────────────────────────┘
Step-by-Step¶
# Step 1: Check raw topic
kafkacat -b kafka:9092 -t sales.orders.raw -C -c 5 -o end
# Step 2: Check DLQ
kafkacat -b kafka:9092 -t sales.orders_dlq -C -c 5 -o end
# Step 3: Check validator logs
kubectl logs -l app=quality-validator --tail=100
# Step 4: Check validator metrics
curl http://quality-validator:8080/metrics | grep dlq
Common Causes¶
| Cause | Symptoms | Solution |
|---|---|---|
| All data fails quality | DLQ full of records | Review quality rules |
| Validator crashed | No new data in any topic | Restart validator |
| Wrong topic name | Empty raw topic | Check producer config |
| Network issues | Intermittent | Check connectivity |
Issue: High DLQ Count¶
Symptom¶
DLQ count growing rapidly or staying high.
Immediate Actions¶
# 1. Assess severity
DLQ_COUNT=$(kafkacat -b kafka:9092 -t sales.orders_dlq -C -e -q | wc -l)
echo "DLQ count: $DLQ_COUNT"
# 2. Get sample of errors
kafkacat -b kafka:9092 -t sales.orders_dlq -C -c 10 -o end | \
jq '.validation_errors[].rule_name' | sort | uniq -c | sort -rn
# 3. Identify pattern
python scripts/dlq_analysis.py --topic sales.orders_dlq --limit 1000
Analysis Script¶
# scripts/dlq_analysis.py
from kafka import KafkaConsumer
from collections import Counter
import json
import argparse
def analyze_dlq(topic: str, limit: int = 1000):
consumer = KafkaConsumer(
topic,
bootstrap_servers=['kafka:9092'],
auto_offset_reset='latest',
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
error_counter = Counter()
field_counter = Counter()
count = 0
for msg in consumer:
if count >= limit:
break
for error in msg.value.get('validation_errors', []):
error_counter[error['rule_name']] += 1
field_counter[error['field']] += 1
count += 1
print("\n=== TOP FAILING RULES ===")
for rule, count in error_counter.most_common(10):
print(f" {rule}: {count}")
print("\n=== TOP PROBLEMATIC FIELDS ===")
for field, count in field_counter.most_common(10):
print(f" {field}: {count}")
consumer.close()
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('--topic', required=True)
parser.add_argument('--limit', type=int, default=1000)
args = parser.parse_args()
analyze_dlq(args.topic, args.limit)
Root Cause Categories¶
DLQ Issues
├── Source Data Issues (60%)
│ ├── Null values in required fields
│ ├── Invalid formats (email, date)
│ └── Out of range values
│
├── Schema Mismatch (25%)
│ ├── Source changed schema
│ ├── Contract not updated
│ └── Version mismatch
│
├── Quality Rules Too Strict (10%)
│ ├── Business logic changed
│ └── Edge cases not considered
│
└── System Issues (5%)
├── Serialization errors
└── Validator bugs
Issue: Schema Registry Errors¶
Symptom¶
Diagnosis¶
# Check current schema
curl http://schema-registry:8081/subjects/sales.orders-value/versions/latest | jq '.'
# Check compatibility
curl -X POST \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "{...}"}' \
http://schema-registry:8081/compatibility/subjects/sales.orders-value/versions/latest
Solutions¶
1. Schema is backward incompatible¶
# Option A: Fix schema to be compatible
# - Don't remove fields
# - Don't change types
# - Add default values to new required fields
# Option B: Register as new subject (version bump)
# Subject: sales.orders_v2-value
2. Compatibility mode issue¶
# Check compatibility mode
curl http://schema-registry:8081/config/sales.orders-value | jq '.'
# Change if needed (careful!)
curl -X PUT \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"compatibility": "BACKWARD"}' \
http://schema-registry:8081/config/sales.orders-value
Compatibility Modes¶
| Mode | Description | Use Case |
|---|---|---|
| BACKWARD | New schema can read old data | Default, safe |
| FORWARD | Old schema can read new data | Consumer updates first |
| FULL | Both backward and forward | Strictest |
| NONE | No compatibility checks | Migration only |
Issue: Consumer Deserialization Errors¶
Symptom¶
Diagnosis¶
# Check consumer schema version
from confluent_kafka.schema_registry import SchemaRegistryClient
sr = SchemaRegistryClient({'url': 'http://schema-registry:8081'})
versions = sr.get_versions('sales.orders-value')
print(f"Available versions: {versions}")
latest = sr.get_latest_version('sales.orders-value')
print(f"Latest: {latest.version}")
Solutions¶
# Option 1: Update consumer to use latest schema
consumer = AvroConsumer({
'bootstrap.servers': 'kafka:9092',
'schema.registry.url': 'http://schema-registry:8081',
'group.id': 'my-consumer',
# Force latest schema
'auto.register.schemas': False
})
# Option 2: Pin to specific schema version
from confluent_kafka.schema_registry import SchemaRegistryClient
sr = SchemaRegistryClient({'url': 'http://schema-registry:8081'})
schema = sr.get_version('sales.orders-value', 5) # Pin to version 5
Issue: Performance Problems¶
Symptom¶
High latency in data pipeline, validator falling behind.
Diagnosis¶
# Check consumer lag
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
--describe --group quality-validator
# Check validator metrics
curl http://quality-validator:8080/metrics | grep -E "(latency|processed|lag)"
Common Causes & Solutions¶
| Cause | Metric | Solution |
|---|---|---|
| Slow quality rules | High rule_execution_time | Optimize rules |
| Too many rules | High total_validation_time | Reduce/combine rules |
| Large messages | High message_size_bytes | Compress or split |
| Insufficient resources | High CPU/Memory | Scale horizontally |
Optimization Tips¶
# ❌ Slow: Complex regex on every record
rules:
- name: "complex_validation"
type: "regex"
pattern: "^(?=.*[A-Z])(?=.*[a-z])(?=.*\\d)(?=.*[@$!%*?&]).{8,}$"
# ✅ Fast: Simple checks first, complex last
rules:
- name: "not_null" # Fast
type: "not_null"
- name: "length_check" # Medium
type: "string_length"
max: 255
- name: "complex_check" # Slow (only if above pass)
type: "regex"
Issue: Contract Versioning Conflicts¶
Symptom¶
CI fails with version check error.
Diagnosis¶
# Check current version
grep "contract_version" domains/sales/orders/contract.yaml
# Check what version should be
python ci/suggest_version.py --contract domains/sales/orders/contract.yaml
Solutions¶
# If breaking change detected but version not bumped:
contract_version: "1.2.0" # ❌ Wrong
contract_version: "2.0.0" # ✅ Correct (MAJOR bump)
# If non-breaking change:
contract_version: "1.2.0" # ❌ Wrong
contract_version: "1.3.0" # ✅ Correct (MINOR bump)
Useful Commands Cheatsheet¶
# ═══════════════════════════════════════════════════════════════
# KAFKA
# ═══════════════════════════════════════════════════════════════
# List topics
kafka-topics.sh --bootstrap-server kafka:9092 --list | grep sales
# Get topic info
kafka-topics.sh --bootstrap-server kafka:9092 --describe --topic sales.orders_prod
# Read last N messages
kafkacat -b kafka:9092 -t sales.orders_prod -C -c 10 -o end
# Count messages
kafkacat -b kafka:9092 -t sales.orders_dlq -C -e -q | wc -l
# Consumer lag
kafka-consumer-groups.sh --bootstrap-server kafka:9092 --describe --group quality-validator
# ═══════════════════════════════════════════════════════════════
# SCHEMA REGISTRY
# ═══════════════════════════════════════════════════════════════
# List subjects
curl http://schema-registry:8081/subjects | jq '.'
# Get latest schema
curl http://schema-registry:8081/subjects/sales.orders-value/versions/latest | jq '.'
# Get all versions
curl http://schema-registry:8081/subjects/sales.orders-value/versions | jq '.'
# Check compatibility
curl -X POST \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data @new_schema.json \
http://schema-registry:8081/compatibility/subjects/sales.orders-value/versions/latest
# ═══════════════════════════════════════════════════════════════
# CONTRACTS
# ═══════════════════════════════════════════════════════════════
# Validate contract
python ci/validate_contract.py domains/sales/orders/contract.yaml
# Check breaking changes
python ci/detect_breaking_changes.py --contract domains/sales/orders/contract.yaml
# Suggest version
python ci/suggest_version.py --contract domains/sales/orders/contract.yaml
# ═══════════════════════════════════════════════════════════════
# DLQ
# ═══════════════════════════════════════════════════════════════
# Analyze DLQ
python scripts/dlq_analysis.py --topic sales.orders_dlq --limit 100
# Reprocess DLQ (dry run)
python scripts/dlq_reprocess.py --topic sales.orders_dlq --dry-run
# Reprocess DLQ (actual)
python scripts/dlq_reprocess.py --topic sales.orders_dlq
Getting Help¶
Self-Service Resources¶
- Documentation:
docs/folder in contracts repo - Runbooks:
domains/{domain}/{entity}/runbook.md - FAQ:
docs/faq.md
Support Channels¶
| Channel | Response Time | Use For |
|---|---|---|
#data-contracts-help | < 2 hours | General questions |
#data-platform-oncall | < 30 min | Urgent issues |
| PagerDuty | < 15 min | P1 incidents only |
Escalation Path¶
1. Slack #data-contracts-help
↓ (no response in 2 hours)
2. Slack #data-platform-oncall
↓ (no response in 30 min OR P1)
3. PagerDuty data-platform-oncall
Версия: 1.0 Последнее обновление: 24 января 2026