Перейти к содержанию

Consumer Guide

Обзор

Вы — потребитель данных (Consumer). Контракты данных гарантируют вам:

  • ✅ Стабильную схему данных
  • ✅ Известное качество данных
  • ✅ Уведомления о критических изменениях
  • ✅ Ответственного за проблемы (Owner)

Quick Start

Шаг 1: Найдите нужный контракт

# Список всех контрактов
ls contracts/domains/

# Структура:
# domains/
# ├── sales/
# │   ├── orders/
# │   │   ├── contract.yaml
# │   │   └── quality_rules.yml
# │   └── customers/
# ├── warehouse/
# └── marketing/

Шаг 2: Изучите контракт

# Просмотр контракта
cat contracts/domains/sales/orders/contract.yaml

Ключевая информация: - schema.fields — какие поля доступны - sla.freshness — насколько свежие данные - quality_rules — какие проверки проходят данные - metadata.owner — к кому обращаться с вопросами

Шаг 3: Подключитесь к данным

Kafka Consumer (рекомендуется)

from confluent_kafka import Consumer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer

# Конфигурация
kafka_conf = {
    'bootstrap.servers': 'kafka:9092',
    'group.id': 'your-consumer-group',
    'auto.offset.reset': 'earliest'
}

schema_registry_conf = {'url': 'http://schema-registry:8081'}

# Создание consumer
schema_registry_client = SchemaRegistryClient(schema_registry_conf)
avro_deserializer = AvroDeserializer(schema_registry_client)

consumer = Consumer(kafka_conf)

# Подписка на prod топик (качественные данные!)
# Формат: {namespace}.{name}_prod
consumer.subscribe(['sales.orders_prod'])

# Чтение данных
while True:
    msg = consumer.poll(1.0)
    if msg is None:
        continue
    if msg.error():
        print(f"Consumer error: {msg.error()}")
        continue

    record = avro_deserializer(msg.value(), SerializationContext(
        msg.topic(), MessageField.VALUE
    ))

    # Данные гарантированно прошли quality checks!
    process_order(record)

SQL (через ClickHouse/Postgres)

-- Данные из prod топиков реплицируются в DWH
SELECT 
    order_id,
    customer_id,
    total_amount,
    status,
    created_at
FROM sales.orders_prod
WHERE created_at >= today() - interval 7 day;

REST API (если доступен)

import requests

response = requests.get(
    "https://data-api.company.ru/v1/sales/orders",
    params={
        "limit": 100,
        "created_after": "2026-01-01"
    },
    headers={"Authorization": "Bearer your-token"}
)

orders = response.json()['data']

Понимание качества данных

Что означает _prod топик

Данные в {namespace}.{entity}_prod топике прошли все проверки из quality_rules.yml:

# Пример правил для sales.orders
rules:
  - name: "order_id_not_null"      # ✓ order_id гарантированно есть
    field: "order_id"
    type: "not_null"
    severity: "error"

  - name: "total_amount_positive"   # ✓ сумма >= 0
    field: "total_amount"
    type: "range"
    min: 0
    severity: "error"

Severity levels

Severity В _prod В _dlq
error Никогда Всегда
warning Да (с предупреждением) Нет
info Да Нет

Работа с изменениями схемы

Вы получите уведомление

При любом изменении контракта вы получите: - 📧 Email (если подписаны) - 💬 Сообщение в Slack - 📝 Merge Request с деталями

Типы изменений

✅ Backward Compatible (безопасные)

# Было
fields:
  - name: order_id
    type: string

# Стало (добавлено nullable поле)
fields:
  - name: order_id
    type: string
  - name: discount         # Новое поле
    type: decimal
    required: false        # Nullable = безопасно

Ваши действия: Можете игнорировать или начать использовать новое поле.

⚠️ Breaking Changes (требуют внимания)

# Было
fields:
  - name: amount
    type: decimal

# Стало (изменён тип!)
fields:
  - name: amount
    type: string          # Breaking change!

Ваши действия: 1. Получите уведомление с датой migration 2. Обновите ваш код до указанной даты 3. Подтвердите готовность

Подписка на изменения

Добавьте вашу команду в consumers контракта:

# contract.yaml
lineage:
  consumers:
    - name: "analytics_team"
      usage: "revenue_dashboard"
      criticality: "high"
      contact: "analytics@company.ru"   # ← Добавьте ваш контакт

Обработка проблем

Данные не поступают

  1. Проверьте статус prod топика:

    kafkacat -b kafka:9092 -L | grep sales.orders_prod
    

  2. Проверьте SLA:

    # contract.yaml
    sla:
      freshness: "PT1H"   # Данные должны быть не старше 1 часа
    

  3. Если SLA нарушен — свяжитесь с owner:

    metadata:
      owner:
        team: "sales-integration"
        email: "sales-data@company.ru"
        slack: "#sales-data-alerts"
    

Данные некорректные

  1. Проверьте, какие quality rules применяются
  2. Если данные в _prod всё равно некорректные:
  3. Это баг в quality_rules
  4. Создайте issue и уведомите owner

Нужен новый источник данных

  1. Проверьте, существует ли контракт
  2. Если нет — запросите у команды-владельца
  3. Если есть, но не подходит — обсудите изменения

Best Practices

DO ✅

# ✅ Используйте типизированные модели
from pydantic import BaseModel
from datetime import datetime
from decimal import Decimal

class Order(BaseModel):
    order_id: str
    customer_id: str
    total_amount: Decimal
    status: str
    created_at: datetime

# ✅ Обрабатывайте nullable поля
def process_order(order: Order):
    discount = order.discount if order.discount else Decimal(0)

# ✅ Подписывайтесь на изменения
# Добавьте вашу команду в consumers контракта

DON'T ❌

# ❌ Не хардкодьте структуру данных
data = json.loads(msg)
amount = data['amount']  # Может сломаться!

# ❌ Не игнорируйте schema evolution
# Всегда используйте Avro deserializer

# ❌ Не читайте из raw топика
# raw = не прошёл quality checks!
consumer.subscribe(['sales.orders.raw'])  # ПЛОХО!

# ✅ Читайте из prod топика
consumer.subscribe(['sales.orders_prod'])  # ХОРОШО!

Мониторинг

Grafana Dashboard

Для каждого контракта доступен dashboard: https://grafana.company.ru/d/data-contracts/{namespace}-{entity}

Метрики: - Record count (prod vs dlq) - Latency (time to validate) - Quality pass rate - Consumer lag

Alerts

Подпишитесь на алерты о проблемах с данными: - Slack: Добавьте интеграцию в ваш канал - Email: Укажите в consumers контракта

FAQ

Q: Могу ли я читать из DLQ?

A: Технически да, но не рекомендуется. DLQ содержит данные с ошибками качества. Если вам нужны эти данные — обсудите с owner изменение quality_rules.

Q: Как узнать, когда данные обновляются?

A: Смотрите lineage.source.extraction_schedule в контракте:

lineage:
  source:
    extraction_schedule: "*/5 * * * *"  # Каждые 5 минут

Q: Что делать, если нужно поле, которого нет?

A: Создайте Merge Request с добавлением поля в контракт или обсудите с owner. Добавление nullable поля — это non-breaking change.

Q: Как протестировать интеграцию?

A: 1. Используйте dev окружение 2. Подпишитесь на {namespace}.{entity}_prod в dev Kafka 3. Проверьте обработку тестовых данных


Версия: 1.0 Последнее обновление: 24 января 2026