Consumer Guide¶
Обзор¶
Вы — потребитель данных (Consumer). Контракты данных гарантируют вам:
- ✅ Стабильную схему данных
- ✅ Известное качество данных
- ✅ Уведомления о критических изменениях
- ✅ Ответственного за проблемы (Owner)
Quick Start¶
Шаг 1: Найдите нужный контракт¶
# Список всех контрактов
ls contracts/domains/
# Структура:
# domains/
# ├── sales/
# │ ├── orders/
# │ │ ├── contract.yaml
# │ │ └── quality_rules.yml
# │ └── customers/
# ├── warehouse/
# └── marketing/
Шаг 2: Изучите контракт¶
Ключевая информация: - schema.fields — какие поля доступны - sla.freshness — насколько свежие данные - quality_rules — какие проверки проходят данные - metadata.owner — к кому обращаться с вопросами
Шаг 3: Подключитесь к данным¶
Kafka Consumer (рекомендуется)¶
from confluent_kafka import Consumer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer
# Конфигурация
kafka_conf = {
'bootstrap.servers': 'kafka:9092',
'group.id': 'your-consumer-group',
'auto.offset.reset': 'earliest'
}
schema_registry_conf = {'url': 'http://schema-registry:8081'}
# Создание consumer
schema_registry_client = SchemaRegistryClient(schema_registry_conf)
avro_deserializer = AvroDeserializer(schema_registry_client)
consumer = Consumer(kafka_conf)
# Подписка на prod топик (качественные данные!)
# Формат: {namespace}.{name}_prod
consumer.subscribe(['sales.orders_prod'])
# Чтение данных
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
print(f"Consumer error: {msg.error()}")
continue
record = avro_deserializer(msg.value(), SerializationContext(
msg.topic(), MessageField.VALUE
))
# Данные гарантированно прошли quality checks!
process_order(record)
SQL (через ClickHouse/Postgres)¶
-- Данные из prod топиков реплицируются в DWH
SELECT
order_id,
customer_id,
total_amount,
status,
created_at
FROM sales.orders_prod
WHERE created_at >= today() - interval 7 day;
REST API (если доступен)¶
import requests
response = requests.get(
"https://data-api.company.ru/v1/sales/orders",
params={
"limit": 100,
"created_after": "2026-01-01"
},
headers={"Authorization": "Bearer your-token"}
)
orders = response.json()['data']
Понимание качества данных¶
Что означает _prod топик¶
Данные в {namespace}.{entity}_prod топике прошли все проверки из quality_rules.yml:
# Пример правил для sales.orders
rules:
- name: "order_id_not_null" # ✓ order_id гарантированно есть
field: "order_id"
type: "not_null"
severity: "error"
- name: "total_amount_positive" # ✓ сумма >= 0
field: "total_amount"
type: "range"
min: 0
severity: "error"
Severity levels¶
| Severity | В _prod | В _dlq |
|---|---|---|
error | Никогда | Всегда |
warning | Да (с предупреждением) | Нет |
info | Да | Нет |
Работа с изменениями схемы¶
Вы получите уведомление¶
При любом изменении контракта вы получите: - 📧 Email (если подписаны) - 💬 Сообщение в Slack - 📝 Merge Request с деталями
Типы изменений¶
✅ Backward Compatible (безопасные)¶
# Было
fields:
- name: order_id
type: string
# Стало (добавлено nullable поле)
fields:
- name: order_id
type: string
- name: discount # Новое поле
type: decimal
required: false # Nullable = безопасно
Ваши действия: Можете игнорировать или начать использовать новое поле.
⚠️ Breaking Changes (требуют внимания)¶
# Было
fields:
- name: amount
type: decimal
# Стало (изменён тип!)
fields:
- name: amount
type: string # Breaking change!
Ваши действия: 1. Получите уведомление с датой migration 2. Обновите ваш код до указанной даты 3. Подтвердите готовность
Подписка на изменения¶
Добавьте вашу команду в consumers контракта:
# contract.yaml
lineage:
consumers:
- name: "analytics_team"
usage: "revenue_dashboard"
criticality: "high"
contact: "analytics@company.ru" # ← Добавьте ваш контакт
Обработка проблем¶
Данные не поступают¶
-
Проверьте статус prod топика:
-
Проверьте SLA:
-
Если SLA нарушен — свяжитесь с owner:
Данные некорректные¶
- Проверьте, какие quality rules применяются
- Если данные в
_prodвсё равно некорректные: - Это баг в quality_rules
- Создайте issue и уведомите owner
Нужен новый источник данных¶
- Проверьте, существует ли контракт
- Если нет — запросите у команды-владельца
- Если есть, но не подходит — обсудите изменения
Best Practices¶
DO ✅¶
# ✅ Используйте типизированные модели
from pydantic import BaseModel
from datetime import datetime
from decimal import Decimal
class Order(BaseModel):
order_id: str
customer_id: str
total_amount: Decimal
status: str
created_at: datetime
# ✅ Обрабатывайте nullable поля
def process_order(order: Order):
discount = order.discount if order.discount else Decimal(0)
# ✅ Подписывайтесь на изменения
# Добавьте вашу команду в consumers контракта
DON'T ❌¶
# ❌ Не хардкодьте структуру данных
data = json.loads(msg)
amount = data['amount'] # Может сломаться!
# ❌ Не игнорируйте schema evolution
# Всегда используйте Avro deserializer
# ❌ Не читайте из raw топика
# raw = не прошёл quality checks!
consumer.subscribe(['sales.orders.raw']) # ПЛОХО!
# ✅ Читайте из prod топика
consumer.subscribe(['sales.orders_prod']) # ХОРОШО!
Мониторинг¶
Grafana Dashboard¶
Для каждого контракта доступен dashboard: https://grafana.company.ru/d/data-contracts/{namespace}-{entity}
Метрики: - Record count (prod vs dlq) - Latency (time to validate) - Quality pass rate - Consumer lag
Alerts¶
Подпишитесь на алерты о проблемах с данными: - Slack: Добавьте интеграцию в ваш канал - Email: Укажите в consumers контракта
FAQ¶
Q: Могу ли я читать из DLQ?¶
A: Технически да, но не рекомендуется. DLQ содержит данные с ошибками качества. Если вам нужны эти данные — обсудите с owner изменение quality_rules.
Q: Как узнать, когда данные обновляются?¶
A: Смотрите lineage.source.extraction_schedule в контракте:
Q: Что делать, если нужно поле, которого нет?¶
A: Создайте Merge Request с добавлением поля в контракт или обсудите с owner. Добавление nullable поля — это non-breaking change.
Q: Как протестировать интеграцию?¶
A: 1. Используйте dev окружение 2. Подпишитесь на {namespace}.{entity}_prod в dev Kafka 3. Проверьте обработку тестовых данных
Версия: 1.0 Последнее обновление: 24 января 2026