Перейти к содержанию

Producer Onboarding Guide

Добро пожаловать!

Этот гайд поможет вам, как издателю данных (Producer), начать работу с системой контрактов данных.

Что такое Data Contract?

Контракт данных — это формальное соглашение между вами (издателем) и потребителями ваших данных. Он определяет:

  • ✅ Какие данные вы отправляете (схема)
  • ✅ Какого качества эти данные (правила)
  • ✅ Кто отвечает за данные (вы!)
  • ✅ Как быстро реагировать на проблемы (SLA)

Quick Start (15 минут)

Шаг 1: Клонируйте репозиторий контрактов

git clone git@gitlab.company.ru:data/contracts.git
cd contracts

Шаг 2: Создайте ветку для вашего контракта

git checkout -b feat/add-{domain}-{entity}-contract
# Пример: git checkout -b feat/add-sales-orders-contract

Шаг 3: Скопируйте шаблон контракта

# Создайте директорию для вашего контракта
mkdir -p domains/{domain}/{entity}

# Скопируйте шаблон
cp templates/contract-template.yaml domains/{domain}/{entity}/contract.yaml
cp templates/quality_rules-template.yml domains/{domain}/{entity}/quality_rules.yml

Шаг 4: Заполните контракт

Откройте domains/{domain}/{entity}/contract.yaml и заполните:

spec_version: "1.0.0"
contract_version: "1.0.0"

metadata:
  name: "your_entity_name"           # ← Имя вашей сущности
  namespace: "your_domain"           # ← Ваш домен
  description: "Описание данных"     # ← Что это за данные?
  owner:
    team: "your-team"                # ← Ваша команда
    email: "your-team@company.ru"    # ← Email для алертов

schema:
  format: "avro"
  fields:
    - name: "id"                     # ← Ваши поля
      type: "string"
      required: true
      description: "Уникальный ID"
    # ... добавьте остальные поля

Шаг 5: Добавьте правила качества

Откройте domains/{domain}/{entity}/quality_rules.yml:

version: "1.0"
rules:
  - name: "id_not_null"
    field: "id"
    type: "not_null"
    severity: "error"

  # Добавьте правила для ваших полей

Шаг 6: Проверьте локально

# Валидация синтаксиса
python ci/validate_contract.py domains/{domain}/{entity}/contract.yaml

# Ожидаемый результат:
# ✓ Contract is valid

Шаг 7: Создайте Merge Request

git add domains/{domain}/{entity}/
git commit -m "feat: add {domain}/{entity} data contract"
git push -u origin feat/add-{domain}-{entity}-contract

Перейдите в GitLab и создайте Merge Request.

Шаг 8: Пройдите Code Review

  • CI/CD автоматически проверит ваш контракт
  • Data Platform team проведёт review
  • После approval — merge!

Интеграция с вашим сервисом

Вариант 1: Kafka Producer (рекомендуется)

from confluent_kafka import Producer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
import json

# Конфигурация
schema_registry_conf = {'url': 'http://schema-registry:8081'}
kafka_conf = {
    'bootstrap.servers': 'kafka:9092',
    'client.id': 'your-service-name'
}

# Загрузка схемы из контракта
with open('path/to/contract.yaml') as f:
    contract = yaml.safe_load(f)

# Создание producer
schema_registry_client = SchemaRegistryClient(schema_registry_conf)
avro_serializer = AvroSerializer(
    schema_registry_client,
    schema_str=json.dumps(contract_to_avro_schema(contract))
)

producer = Producer(kafka_conf)

def send_data(record: dict):
    """Отправка данных согласно контракту"""
    topic = f"{contract['metadata']['namespace']}.{contract['metadata']['name']}.raw"

    producer.produce(
        topic=topic,
        value=avro_serializer(record, SerializationContext(topic, MessageField.VALUE)),
        callback=delivery_report
    )
    producer.flush()

Вариант 2: HTTP API (через API Gateway)

import requests
import io
import avro.schema
import avro.io

API_GATEWAY_URL = "https://data-gateway.company.ru"

def serialize_to_avro(record: dict, schema_json: str) -> bytes:
    """Сериализация записи в Avro"""
    schema = avro.schema.parse(schema_json)
    writer = avro.io.DatumWriter(schema)
    bytes_writer = io.BytesIO()
    encoder = avro.io.BinaryEncoder(bytes_writer)
    writer.write(record, encoder)
    return bytes_writer.getvalue()

def send_data(record: dict, topic: str, contract_version: str, schema_json: str):
    """
    Отправка через API Gateway.

    ⚠️ Важно:
    - API Gateway НЕ валидирует данные
    - Данные принимаются "as-is" в Kafka .raw
    - Валидация происходит потом в Quality Validator
    - Поэтому API Gateway быстрый и не блокирует издателя
    """

    # Сериализация в Avro
    avro_payload = serialize_to_avro(record, schema_json)

    response = requests.post(
        f"{API_GATEWAY_URL}/api/1.0/{topic}",
        data=avro_payload,  # Avro bytes
        headers={
            "Content-Type": "application/avro",
            "version": contract_version,
            "batch": "1",  # количество записей в батче
        },
        cert=("/secure/certs/{topic}.crt", "/secure/certs/{topic}.key"),
        verify="/secure/certs/ca.crt"
    )

    if response.status_code != 201:
        raise Exception(f"Failed to send data: {response.status_code} {response.text}")

    return response

Вариант 3: Batch (файлы)

import pandas as pd
from datetime import datetime

def export_batch(df: pd.DataFrame, domain: str, entity: str):
    """Экспорт batch данных"""
    timestamp = datetime.utcnow().strftime('%Y%m%d_%H%M%S')
    filename = f"{domain}_{entity}_{timestamp}.parquet"

    # Опциональная client-side валидация (для раннего обнаружения проблем)
    # Основная валидация всё равно происходит в Quality Validator
    # validate_against_contract(df, f"contracts/domains/{domain}/{entity}/contract.yaml")

    # Сохранение
    df.to_parquet(f"/data/outbox/{filename}")

Обработка ошибок

Что делать, если данные попали в DLQ?

  1. Получите уведомление (Slack/Email)
  2. Посмотрите детали ошибки:
    # Подключитесь к Kafka и посмотрите DLQ
    kafkacat -b kafka:9092 -t {domain}.{entity}_dlq -C -o beginning
    
  3. Исправьте данные в источнике
  4. Переотправьте исправленные данные

Частые ошибки

Ошибка Причина Решение
Field 'X' is required but null Обязательное поле пустое Заполнить поле или сделать optional
Invalid format for 'email' Некорректный формат Проверить валидность данных
Value exceeds maximum Число больше лимита Пересмотреть constraints
Schema mismatch Схема изменилась Обновить версию контракта

Изменение контракта

Non-breaking changes (безопасные)

  • ✅ Добавление нового nullable поля
  • ✅ Изменение description
  • ✅ Смягчение constraints (min → меньше)
# Изменение версии: 1.0.0 → 1.1.0 (MINOR)

Breaking changes (опасные)

  • ⛔ Удаление поля
  • ⛔ Изменение типа поля
  • ⛔ Добавление required поля
  • ⛔ Ужесточение constraints
# Требуется: 1.0.0 → 2.0.0 (MAJOR)
# Необходимо согласование со всеми consumers!

Процесс изменения

  1. Создайте MR с изменениями контракта
  2. CI определит тип изменения
  3. Для breaking changes:
  4. Уведомите всех consumers
  5. Согласуйте migration plan
  6. Получите explicit approval
  7. После merge — обновите ваш сервис

SLA: Ваша ответственность

Как owner контракта, вы отвечаете за:

SLA Ваша обязанность
Availability Данные поступают согласно расписанию
Freshness Данные не старше указанного возраста
Response Time Реакция на инциденты в срок
Quality Данные соответствуют правилам

Уровни severity

Severity Response Time Пример
🔴 Critical 15 мин DLQ > 1000, нет данных > 2ч
🟠 High 1 час DLQ > 100, нарушение SLA
🟡 Medium 4 часа Нарастающие ошибки
⚪ Low 1 день Косметические проблемы

Чек-лист перед релизом

  • Контракт создан и прошёл review
  • Quality rules покрывают критичные поля
  • SLA определён и реалистичен
  • Runbook создан (как реагировать на проблемы)
  • Алерты настроены (Slack/Email)
  • Consumers уведомлены о новом источнике данных
  • Интеграция протестирована на dev/staging
  • Метрики настроены (Grafana dashboard)

Получение помощи

Каналы поддержки

  • 💬 Slack: #data-contracts-help
  • 📧 Email: data-platform@company.ru
  • 📚 Wiki: https://wiki.company.ru/data-contracts
  • 🆘 On-call: см. PagerDuty

Office Hours

Data Platform team проводит office hours: - Когда: Вторник и четверг, 14:00-15:00 - Где: Zoom link в #data-contracts-help


Версия: 1.0 Последнее обновление: 24 января 2026