DLQ Process & Runbook¶
Обзор¶
Dead Letter Queue (DLQ) — это Kafka топик, куда попадают данные, не прошедшие проверку качества.
┌────────────────┐ ┌─────────────┐ ┌────────────────┐
│ Raw Topic │────▶│ Validator │────▶│ Prod Topic │ ✅ Quality OK
│ {ns}.{e}.raw │ │ │ │ {ns}.{e}_prod │
└────────────────┘ └──────┬──────┘ └────────────────┘
│
│ Quality Check Failed
▼
┌────────────────┐
│ DLQ Topic │ ⚠️ Needs Attention
│ {ns}.{e}_dlq │
└────────────────┘
Структура DLQ Record¶
{
"original_record": {
"order_id": null,
"customer_id": "cust_abc123",
"total_amount": -100.50,
"status": "unknown"
},
"validation_errors": [
{
"rule_name": "order_id_not_null",
"field": "order_id",
"expected": "non-null value",
"actual": "null",
"severity": "error",
"message": "Field 'order_id' is required but was null"
},
{
"rule_name": "total_amount_positive",
"field": "total_amount",
"expected": "value >= 0",
"actual": "-100.50",
"severity": "error",
"message": "Field 'total_amount' must be >= 0"
},
{
"rule_name": "status_valid_enum",
"field": "status",
"expected": "one of: pending, confirmed, shipped, delivered, cancelled",
"actual": "unknown",
"severity": "error",
"message": "Field 'status' has invalid value"
}
],
"metadata": {
"contract_namespace": "sales",
"contract_name": "orders",
"contract_version": "1.2.0",
"source_topic": "sales.orders.raw",
"timestamp": "2026-01-23T10:30:00.000Z",
"validator_version": "1.0.0",
"trace_id": "abc123def456"
}
}
DLQ Lifecycle¶
┌──────────────────────────────────────────────────────────────────────────────┐
│ DLQ LIFECYCLE │
├──────────────────────────────────────────────────────────────────────────────┤
│ │
│ 1. DETECTION 2. NOTIFICATION │
│ ┌─────────────────────────┐ ┌─────────────────────────┐ │
│ │ Validator detects │───▶│ Alert sent to owner: │ │
│ │ quality issue │ │ - Slack │ │
│ │ Record → DLQ │ │ - Email │ │
│ └─────────────────────────┘ │ - PagerDuty (critical) │ │
│ └───────────┬─────────────┘ │
│ │ │
│ 3. INVESTIGATION ▼ │
│ ┌─────────────────────────┐ ┌─────────────────────────┐ │
│ │ Owner investigates: │◀───│ Owner acknowledges │ │
│ │ - View DLQ records │ │ alert │ │
│ │ - Identify root cause │ └─────────────────────────┘ │
│ │ - Assess impact │ │
│ └───────────┬─────────────┘ │
│ │ │
│ ▼ │
│ 4. RESOLUTION 5. VERIFICATION │
│ ┌─────────────────────────┐ ┌─────────────────────────┐ │
│ │ Fix applied: │───▶│ Re-send corrected data │ │
│ │ - Fix source system │ │ Monitor DLQ drain │ │
│ │ - Or update rules │ │ Confirm resolution │ │
│ └─────────────────────────┘ └─────────────────────────┘ │
│ │
└──────────────────────────────────────────────────────────────────────────────┘
Runbook: DLQ Alert Response¶
Шаг 1: Acknowledge Alert¶
Время: < 5 минут
-
Подтвердите получение алерта в Slack:
-
Или в PagerDuty (для critical):
- Acknowledge инцидент
- Добавьте комментарий
Шаг 2: Assess Severity¶
Время: 5-10 минут
# Проверьте количество записей в DLQ
kafkacat -b kafka:9092 -t sales.orders_dlq -C -e -q | wc -l
# Проверьте скорость накопления (последние 5 минут)
kafkacat -b kafka:9092 -t sales.orders_dlq -C -o -5m -e -q | wc -l
| DLQ Count | Growth Rate | Severity | Response Time |
|---|---|---|---|
| > 1000 | > 100/min | 🔴 Critical | 15 min |
| > 500 | > 50/min | 🔴 Critical | 15 min |
| > 100 | Any | 🟠 High | 1 hour |
| < 100 | Stable | 🟡 Medium | 4 hours |
| < 10 | Decreasing | ⚪ Low | 1 day |
Шаг 3: Investigate Root Cause¶
Время: 10-30 минут
3.1 Просмотр ошибок¶
# Получить последние 10 DLQ записей
kafkacat -b kafka:9092 -t sales.orders_dlq -C -c 10 -o end | jq '.'
# Или через Python скрипт
python scripts/dlq_inspector.py --topic sales.orders_dlq --limit 10
3.2 Анализ паттернов¶
# Какие правила чаще всего нарушаются?
from collections import Counter
errors = []
for record in dlq_records:
for error in record['validation_errors']:
errors.append(error['rule_name'])
print(Counter(errors).most_common(5))
# Пример вывода:
# [('order_id_not_null', 450), ('total_amount_positive', 230), ...]
3.3 Определение root cause¶
| Паттерн | Вероятная причина | Действие |
|---|---|---|
| Одно правило, много ошибок | Bug в источнике данных | Исправить источник |
| Много правил, одно время | Массовый сбой | Проверить upstream |
| Постепенный рост | Degradation | Проверить data pipeline |
| Новые ошибки после deploy | Regression | Откат или hotfix |
Шаг 4: Communicate Impact¶
Время: 5 минут
Уведомите stakeholders:
🔔 DLQ Alert: sales.orders
Status: Investigating
Impact: [X] records in DLQ, [Y] affected consumers
Root Cause: [Preliminary assessment]
ETA: [Estimated time to resolution]
Affected dashboards/reports:
- Revenue Dashboard (High)
- Daily Orders Report (Medium)
Owner: @sales-integration
Шаг 5: Apply Fix¶
Вариант A: Исправление в источнике данных¶
# Пример: исправление в 1C выгрузке
# До исправления данные приходили без order_id
# После исправления:
def export_order(order):
if not order.get('order_id'):
order['order_id'] = generate_uuid() # Или пропустить запись
return order
Вариант B: Изменение quality rules (если правила слишком строгие)¶
# quality_rules.yml
rules:
- name: "total_amount_valid"
field: "total_amount"
type: "range"
# Было: min: 0
min: -1000 # Разрешить возвраты до 1000 руб
severity: "error"
⚠️ Внимание: Изменение rules требует MR и review!
Шаг 6: Re-process DLQ Data¶
Автоматический reprocess (если исправлен источник)¶
# Переслать данные из DLQ обратно в raw топик
python scripts/dlq_reprocess.py \
--source sales.orders_dlq \
--target sales.orders.raw \
--dry-run # Сначала проверить
# Если dry-run OK:
python scripts/dlq_reprocess.py \
--source sales.orders_dlq \
--target sales.orders.raw
Ручной reprocess (если нужна корректировка данных)¶
# scripts/dlq_fix_and_reprocess.py
from kafka import KafkaConsumer, KafkaProducer
import json
consumer = KafkaConsumer(
'sales.orders_dlq',
bootstrap_servers=['kafka:9092'],
auto_offset_reset='earliest',
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
producer = KafkaProducer(
bootstrap_servers=['kafka:9092'],
value_serializer=lambda x: json.dumps(x).encode('utf-8')
)
for msg in consumer:
dlq_record = msg.value
original = dlq_record['original_record']
# Применить fix
if original.get('order_id') is None:
original['order_id'] = f"recovered_{msg.offset}"
if original.get('total_amount', 0) < 0:
original['total_amount'] = 0 # Или пропустить
# Отправить исправленную запись
producer.send('sales.orders.raw', value=original)
producer.flush()
Шаг 7: Verify Resolution¶
Время: 15-30 минут
# Проверить, что DLQ пустеет
watch -n 30 'kafkacat -b kafka:9092 -t sales.orders_dlq -C -e -q | wc -l'
# Проверить, что данные появляются в prod
kafkacat -b kafka:9092 -t sales.orders_prod -C -c 5 -o end | jq '.'
Шаг 8: Close Incident¶
-
Обновите статус в Slack:
-
Создайте postmortem (для Critical/High):
- What happened?
- Impact?
- Root cause?
- Timeline?
- Prevention measures?
DLQ Alerts Configuration¶
Slack Alert Format¶
🚨 DLQ Alert: {namespace}.{entity}
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Severity: {severity}
DLQ Count: {count} records
Growth: {rate}/min
Top Errors:
1. {rule_1}: {count_1} occurrences
2. {rule_2}: {count_2} occurrences
3. {rule_3}: {count_3} occurrences
Owner: {owner_team}
Runbook: {runbook_url}
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
@{owner_slack_handle}
Alert Thresholds¶
# alerting_config.yaml
alerts:
dlq_critical:
condition: "dlq_count > 500 OR dlq_growth_rate > 100/min"
channels: ["slack", "pagerduty"]
repeat_interval: "5m"
dlq_warning:
condition: "dlq_count > 100"
channels: ["slack"]
repeat_interval: "15m"
dlq_info:
condition: "dlq_count > 10"
channels: ["slack"]
repeat_interval: "1h"
Prevention Best Practices¶
Для Producers¶
-
Валидируйте данные перед отправкой
-
Тестируйте против контракта
-
Мониторьте DLQ вашего контракта
- Добавьте dashboard в вашу Grafana
- Настройте алерты в ваш канал
Для Data Platform¶
-
Автоматический retention для DLQ
-
Dead-dead letter queue (для критичных случаев)
- Если DLQ переполняется
-
Архивировать в S3/object storage
-
Регулярный аудит DLQ
- Еженедельный review накопившихся DLQ
- Автоматические напоминания owners
Версия: 1.0 Последнее обновление: 24 января 2026