Producer Onboarding Guide¶
Добро пожаловать!¶
Этот гайд поможет вам, как издателю данных (Producer), начать работу с системой контрактов данных.
Что такое Data Contract?¶
Контракт данных — это формальное соглашение между вами (издателем) и потребителями ваших данных. Он определяет:
- ✅ Какие данные вы отправляете (схема)
- ✅ Какого качества эти данные (правила)
- ✅ Кто отвечает за данные (вы!)
- ✅ Как быстро реагировать на проблемы (SLA)
Quick Start (15 минут)¶
Шаг 1: Клонируйте репозиторий контрактов¶
Шаг 2: Создайте ветку для вашего контракта¶
git checkout -b feat/add-{domain}-{entity}-contract
# Пример: git checkout -b feat/add-sales-orders-contract
Шаг 3: Скопируйте шаблон контракта¶
# Создайте директорию для вашего контракта
mkdir -p domains/{domain}/{entity}
# Скопируйте шаблон
cp templates/contract-template.yaml domains/{domain}/{entity}/contract.yaml
cp templates/quality_rules-template.yml domains/{domain}/{entity}/quality_rules.yml
Шаг 4: Заполните контракт¶
Откройте domains/{domain}/{entity}/contract.yaml и заполните:
spec_version: "1.0.0"
contract_version: "1.0.0"
metadata:
name: "your_entity_name" # ← Имя вашей сущности
namespace: "your_domain" # ← Ваш домен
description: "Описание данных" # ← Что это за данные?
owner:
team: "your-team" # ← Ваша команда
email: "your-team@company.ru" # ← Email для алертов
schema:
format: "avro"
fields:
- name: "id" # ← Ваши поля
type: "string"
required: true
description: "Уникальный ID"
# ... добавьте остальные поля
Шаг 5: Добавьте правила качества¶
Откройте domains/{domain}/{entity}/quality_rules.yml:
version: "1.0"
rules:
- name: "id_not_null"
field: "id"
type: "not_null"
severity: "error"
# Добавьте правила для ваших полей
Шаг 6: Проверьте локально¶
# Валидация синтаксиса
python ci/validate_contract.py domains/{domain}/{entity}/contract.yaml
# Ожидаемый результат:
# ✓ Contract is valid
Шаг 7: Создайте Merge Request¶
git add domains/{domain}/{entity}/
git commit -m "feat: add {domain}/{entity} data contract"
git push -u origin feat/add-{domain}-{entity}-contract
Перейдите в GitLab и создайте Merge Request.
Шаг 8: Пройдите Code Review¶
- CI/CD автоматически проверит ваш контракт
- Data Platform team проведёт review
- После approval — merge!
Интеграция с вашим сервисом¶
Вариант 1: Kafka Producer (рекомендуется)¶
from confluent_kafka import Producer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
import json
# Конфигурация
schema_registry_conf = {'url': 'http://schema-registry:8081'}
kafka_conf = {
'bootstrap.servers': 'kafka:9092',
'client.id': 'your-service-name'
}
# Загрузка схемы из контракта
with open('path/to/contract.yaml') as f:
contract = yaml.safe_load(f)
# Создание producer
schema_registry_client = SchemaRegistryClient(schema_registry_conf)
avro_serializer = AvroSerializer(
schema_registry_client,
schema_str=json.dumps(contract_to_avro_schema(contract))
)
producer = Producer(kafka_conf)
def send_data(record: dict):
"""Отправка данных согласно контракту"""
topic = f"{contract['metadata']['namespace']}.{contract['metadata']['name']}.raw"
producer.produce(
topic=topic,
value=avro_serializer(record, SerializationContext(topic, MessageField.VALUE)),
callback=delivery_report
)
producer.flush()
Вариант 2: HTTP API (через API Gateway)¶
import requests
import io
import avro.schema
import avro.io
API_GATEWAY_URL = "https://data-gateway.company.ru"
def serialize_to_avro(record: dict, schema_json: str) -> bytes:
"""Сериализация записи в Avro"""
schema = avro.schema.parse(schema_json)
writer = avro.io.DatumWriter(schema)
bytes_writer = io.BytesIO()
encoder = avro.io.BinaryEncoder(bytes_writer)
writer.write(record, encoder)
return bytes_writer.getvalue()
def send_data(record: dict, topic: str, contract_version: str, schema_json: str):
"""
Отправка через API Gateway.
⚠️ Важно:
- API Gateway НЕ валидирует данные
- Данные принимаются "as-is" в Kafka .raw
- Валидация происходит потом в Quality Validator
- Поэтому API Gateway быстрый и не блокирует издателя
"""
# Сериализация в Avro
avro_payload = serialize_to_avro(record, schema_json)
response = requests.post(
f"{API_GATEWAY_URL}/api/1.0/{topic}",
data=avro_payload, # Avro bytes
headers={
"Content-Type": "application/avro",
"version": contract_version,
"batch": "1", # количество записей в батче
},
cert=("/secure/certs/{topic}.crt", "/secure/certs/{topic}.key"),
verify="/secure/certs/ca.crt"
)
if response.status_code != 201:
raise Exception(f"Failed to send data: {response.status_code} {response.text}")
return response
Вариант 3: Batch (файлы)¶
import pandas as pd
from datetime import datetime
def export_batch(df: pd.DataFrame, domain: str, entity: str):
"""Экспорт batch данных"""
timestamp = datetime.utcnow().strftime('%Y%m%d_%H%M%S')
filename = f"{domain}_{entity}_{timestamp}.parquet"
# Опциональная client-side валидация (для раннего обнаружения проблем)
# Основная валидация всё равно происходит в Quality Validator
# validate_against_contract(df, f"contracts/domains/{domain}/{entity}/contract.yaml")
# Сохранение
df.to_parquet(f"/data/outbox/{filename}")
Обработка ошибок¶
Что делать, если данные попали в DLQ?¶
- Получите уведомление (Slack/Email)
- Посмотрите детали ошибки:
- Исправьте данные в источнике
- Переотправьте исправленные данные
Частые ошибки¶
| Ошибка | Причина | Решение |
|---|---|---|
Field 'X' is required but null | Обязательное поле пустое | Заполнить поле или сделать optional |
Invalid format for 'email' | Некорректный формат | Проверить валидность данных |
Value exceeds maximum | Число больше лимита | Пересмотреть constraints |
Schema mismatch | Схема изменилась | Обновить версию контракта |
Изменение контракта¶
Non-breaking changes (безопасные)¶
- ✅ Добавление нового nullable поля
- ✅ Изменение description
- ✅ Смягчение constraints (min → меньше)
Breaking changes (опасные)¶
- ⛔ Удаление поля
- ⛔ Изменение типа поля
- ⛔ Добавление required поля
- ⛔ Ужесточение constraints
Процесс изменения¶
- Создайте MR с изменениями контракта
- CI определит тип изменения
- Для breaking changes:
- Уведомите всех consumers
- Согласуйте migration plan
- Получите explicit approval
- После merge — обновите ваш сервис
SLA: Ваша ответственность¶
Как owner контракта, вы отвечаете за:
| SLA | Ваша обязанность |
|---|---|
| Availability | Данные поступают согласно расписанию |
| Freshness | Данные не старше указанного возраста |
| Response Time | Реакция на инциденты в срок |
| Quality | Данные соответствуют правилам |
Уровни severity¶
| Severity | Response Time | Пример |
|---|---|---|
| 🔴 Critical | 15 мин | DLQ > 1000, нет данных > 2ч |
| 🟠 High | 1 час | DLQ > 100, нарушение SLA |
| 🟡 Medium | 4 часа | Нарастающие ошибки |
| ⚪ Low | 1 день | Косметические проблемы |
Чек-лист перед релизом¶
- Контракт создан и прошёл review
- Quality rules покрывают критичные поля
- SLA определён и реалистичен
- Runbook создан (как реагировать на проблемы)
- Алерты настроены (Slack/Email)
- Consumers уведомлены о новом источнике данных
- Интеграция протестирована на dev/staging
- Метрики настроены (Grafana dashboard)
Получение помощи¶
Каналы поддержки¶
- 💬 Slack:
#data-contracts-help - 📧 Email: data-platform@company.ru
- 📚 Wiki: https://wiki.company.ru/data-contracts
- 🆘 On-call: см. PagerDuty
Office Hours¶
Data Platform team проводит office hours: - Когда: Вторник и четверг, 14:00-15:00 - Где: Zoom link в #data-contracts-help
Версия: 1.0 Последнее обновление: 24 января 2026