Architecture Decision Record: Data Contracts¶
ADR-001: Data Contract Architecture¶
Status¶
Accepted
Context¶
Организация сталкивается с проблемами качества данных: - Неконтролируемые изменения схем в источниках - Отсутствие формальной ответственности за данные - Реактивное обнаружение проблем (downstream) - Отсутствие версионирования схем данных
Decision¶
Внедрить архитектуру контрактов данных со следующими компонентами:
┌─────────────────────────────────────────────────────────────────────────────┐
│ ARCHITECTURE OVERVIEW │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ │
│ │ SOURCE │ 1C, CRM, WMS, Mindbox, Applications │
│ │ SYSTEMS │ │
│ └──────┬──────┘ │
│ │ │
│ │ Avro-serialized data │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ CONTRACT REPOSITORY (GitLab) │ │
│ │ ┌─────────────────────────────────────────────────────────────┐ │ │
│ │ │ /contracts │ │ │
│ │ │ ├── domains/{domain}/{entity}/contract.yaml │ │ │
│ │ │ ├── schemas/contract-schema.json │ │ │
│ │ │ └── .gitlab-ci.yml │ │ │
│ │ └─────────────────────────────────────────────────────────────┘ │ │
│ └───────────────────────────────┬─────────────────────────────────────┘ │
│ │ │
│ ┌────────────────────────┼────────────────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ CI/CD │ │ SCHEMA │ │ CATALOG │ │
│ │ VALIDATION │ │ REGISTRY │ │ (opt.) │ │
│ │ (GitLab) │ │ (Confluent)│ │ │ │
│ └─────────────┘ └──────┬──────┘ └─────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ API GATEWAY (mTLS) │ │
│ │ • Authentication • Rate limiting • Routing │ │
│ └───────────────────────────────┬─────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ KAFKA CLUSTER │ │
│ │ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │ │
│ │ │ {domain}. │ │ {domain}. │ │ {domain}. │ │ │
│ │ │ {entity}.raw │ │ {entity}_prod │ │ {entity}_dlq │ │ │
│ │ │ │ │ │ │ │ │ │
│ │ │ • Unvalidated │ │ • Quality OK │ │ • Failed checks │ │ │
│ │ │ • As received │ │ • Ready to use │ │ • Needs fixing │ │ │
│ │ └────────┬────────┘ └────────▲────────┘ └────────▲────────┘ │ │
│ │ │ │ │ │ │
│ │ │ ┌───────┴────────────────────┘ │ │
│ │ │ │ │ │
│ │ └───────────┼─────────────────────────────────────────┐ │ │
│ │ │ │ │ │
│ └───────────────────────┼─────────────────────────────────────────┼───┘ │
│ │ │ │
│ │ │ │
│ ┌───────────────────────┴─────────────────────────────────────────┴───┐ │
│ │ QUALITY VALIDATOR │ │
│ │ │ │
│ │ • Reads from raw topic (batch) │ │
│ │ • Loads quality_rules.yml from contract │ │
│ │ • Validates each record against rules │ │
│ │ • Routes to _prod or _dlq │ │
│ │ • Emits metrics and alerts │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ CONSUMERS │ │ ALERTS │ │
│ │ (BI, ML, │ │ (Slack, │ │
│ │ Analytics) │ │ PagerDuty) │ │
│ └─────────────┘ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Components¶
1. Contract Repository (GitLab)¶
- Назначение: Централизованное хранение контрактов как кода
- Технология: GitLab с CI/CD
- Содержимое: YAML контракты, JSON Schema, CI скрипты
2. CI/CD Validation¶
- Назначение: Автоматическая проверка изменений контрактов
- Технология: GitLab CI/CD
- Проверки:
- Синтаксис YAML
- Соответствие JSON Schema
- Обнаружение breaking changes
- Semantic versioning
3. Schema Registry¶
- Назначение: Централизованное хранение Avro схем для Kafka
- Технология: Confluent Schema Registry
- Функции:
- Версионирование схем
- Compatibility checks
- Сериализация/десериализация
4. API Gateway¶
- Назначение: Единая точка входа для данных
- Технология: Kong / Nginx / custom (Python Flask/FastAPI)
- Функции:
- mTLS аутентификация (CN = topic name)
- Проверка заголовков (version, batch)
- Routing в Kafka
.rawтопик
⚠️ ВАЖНО: API Gateway НЕ валидирует данные! - Принимает Avro bytes "as-is" - Сразу возвращает 201 Created - Не парсит, не трансформирует - Максимально быстрый, не блокирует издателя - Валидация происходит ПОТОМ в Quality Validator
5. Kafka Cluster¶
- Назначение: Message broker для данных
- Технология: Apache Kafka / Confluent
- Topics:
{domain}.{entity}.raw— сырые данные{domain}.{entity}_prod— валидированные{domain}.{entity}_dlq— проблемные
6. Quality Validator¶
- Назначение: Валидация качества данных (асинхронно!)
- Технология: Python (kafka-python, pydantic)
- Функции:
- Batch обработка из
.rawtopic (consumer group) - Загрузка
quality_rules.ymlиз контракта - Применение правил к каждой записи
- Routing в
_prod(валидные) или_dlq(ошибки) - Метрики и алерты при росте DLQ
Почему отдельный процесс? - API Gateway должен быть максимально быстрым - Валидация может быть медленной (сложные правила) - Можно масштабировать независимо (больше партиций, больше consumers) - Не блокирует издателя при росте нагрузки
Data Flow¶
Критический принцип: Валидация НЕ блокирует издателя!
┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐
│ Source │───▶│ API │───▶│ Kafka │───▶│Validator│───▶│Consumer │
│ System │ │ Gateway │ │ (raw) │ │ (async) │ │ │
│ │ │ │ │ │ │ │ │ │
│ │ │ 1. mTLS │ │ 3. batch│ │ 4. valid│ │ │
│ │ │ 2. 201✓ │ │ consume │ │ routing │ │ │
└─────────┘ └─────────┘ └─────────┘ └────┬────┘ └─────────┘
│
НЕ ждём валидации! ├──▶ _prod (OK)
Издатель уже получил 201 │
└──▶ _dlq (Failed)
│
└──▶ Alert Owner
Пошаговый процесс:
- Producer → API Gateway
- Отправляет Avro bytes + заголовки (version, batch)
- API Gateway проверяет только mTLS и заголовки
-
НЕ парсит Avro, НЕ валидирует качество
-
API Gateway → Kafka .raw
- Пишет данные "as-is" в
.rawтопик - Возвращает
201 Createdиздателю -
Издатель свободен! Не ждёт валидации
-
Quality Validator (consumer group)
- Читает батчи из
.rawтопика - Загружает
quality_rules.ymlиз контракта - Применяет правила к каждой записи
-
Работает асинхронно, независимо от издателя
-
Validator → Kafka _prod/_dlq
- Валидные записи →
_prodтопик - Проблемные записи →
_dlqтопик (с деталями ошибки) -
Алерт владельцу при росте DLQ
-
DLQ Process (runbook)
- Владелец получает алерт
- Исправляет данные в источнике
- Переотправляет исправленные данные
- Повторяет пока DLQ не опустеет
Consequences¶
Positive¶
- ✅ Автоматическое обнаружение breaking changes
- ✅ Чёткая ответственность за данные (owner в контракте)
- ✅ Версионирование схем и правил
- ✅ Разделение качественных и проблемных данных
- ✅ Документация как код
Negative¶
- ⚠️ Дополнительная сложность инфраструктуры
- ⚠️ Требует изменения процессов у Publishers
- ⚠️ Learning curve для команд
Risks¶
- Schema Registry как single point of failure (mitigation: HA setup)
- Накопление данных в DLQ (mitigation: мониторинг + алерты + runbook)
- Отставание Quality Validator от
.rawтопика (mitigation: масштабирование consumers)
Alternatives Considered¶
| Alternative | Pros | Cons | Decision |
|---|---|---|---|
| Только dbt tests | Простота | Реактивно, downstream | Rejected |
| Great Expectations | Зрелый продукт | Нет CI/CD интеграции | Rejected |
| Custom validation | Полный контроль | Время на разработку | Selected |
ADR-002: Contract Format (YAML + JSON Schema)¶
Status¶
Accepted
Context¶
Необходимо выбрать формат для описания контрактов данных.
Decision¶
Использовать YAML для описания контрактов с валидацией через JSON Schema.
Rationale¶
- YAML human-readable и легко редактируется
- JSON Schema — industry standard для валидации
- Оба формата поддерживаются в большинстве IDE
- Легко конвертируются друг в друга
ADR-003: Semantic Versioning for Contracts¶
Status¶
Accepted
Context¶
Контракты будут эволюционировать. Нужна система версионирования.
Decision¶
Использовать Semantic Versioning (MAJOR.MINOR.PATCH):
| Изменение | Version bump | Пример |
|---|---|---|
| Breaking change (remove field) | MAJOR | 1.0.0 → 2.0.0 |
| New nullable field | MINOR | 1.0.0 → 1.1.0 |
| Description change | PATCH | 1.0.0 → 1.0.1 |
Consequences¶
- CI/CD автоматически определяет тип изменения
- Breaking changes блокируются до явного подтверждения
- Consumers могут подписаться на конкретную major version
ADR-004: Dead Letter Queue Strategy¶
Status¶
Accepted
Context¶
Данные, не прошедшие валидацию, не должны теряться.
Decision¶
Использовать отдельный Kafka topic {domain}.{entity}_dlq для невалидных данных:
# DLQ record structure
{
"original_record": { ... }, # Исходные данные
"validation_errors": [ # Список ошибок
{
"rule": "order_id_not_null",
"field": "order_id",
"message": "Field is null",
"severity": "error"
}
],
"metadata": {
"timestamp": "2026-01-23T10:30:00Z",
"contract_version": "1.2.0",
"source": "1c_enterprise"
}
}
Consequences¶
- Данные не теряются
- Producer получает детальную информацию об ошибках
- Можно переслать исправленные данные
ADR-005: Storage Layer — Apache Iceberg + Parquet¶
Status¶
Accepted
Context¶
После прохождения данных через Quality Validator и попадания в _prod топик, данные необходимо хранить в Data Lake для долгосрочного хранения и аналитических запросов.
Требования к storage layer: - ACID гарантии для предотвращения inconsistent reads - Schema evolution для безопасного добавления колонок - Time travel для debugging и rollback - Partition evolution для изменения стратегии партиционирования без rewrite - Эффективное хранение (compression, column-level encoding) - Быстрые аналитические запросы (predicate pushdown, columnar format) - Открытый формат (vendor-neutral, interoperability)
Decision¶
Использовать Apache Iceberg для управления таблицами и Apache Parquet для хранения данных.
┌─────────────────────────────────────────────────────────────────────────────┐
│ STORAGE ARCHITECTURE │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────┐ │
│ │ KAFKA _prod │ Quality OK data │
│ │ TOPIC │ │
│ └────────┬────────┘ │
│ │ │
│ │ Avro │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ FLINK / SPARK STREAMING │ │
│ │ • Reads from Kafka _prod │ │
│ │ • Converts Avro → Parquet │ │
│ │ • Writes to Iceberg table │ │
│ └────────────────────────────┬─────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ APACHE ICEBERG TABLE │ │
│ │ ┌──────────────────────────────────────────────────────────────┐ │ │
│ │ │ Metadata Layer (JSON) │ │ │
│ │ │ • Schema │ │ │
│ │ │ • Partition spec │ │ │
│ │ │ • Sort order │ │ │
│ │ │ • Snapshots (time travel) │ │ │
│ │ │ • File manifest │ │ │
│ │ └──────────────────────────────────────────────────────────────┘ │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ ┌──────────────────────────────────────────────────────────────┐ │ │
│ │ │ Data Files (Parquet) │ │ │
│ │ │ • Columnar storage │ │ │
│ │ │ • ZSTD compression │ │ │
│ │ │ • Dictionary encoding for enums │ │ │
│ │ │ • Bloom filters for point lookups │ │ │
│ │ │ • Column statistics (min/max/null_count) │ │ │
│ │ └──────────────────────────────────────────────────────────────┘ │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ QUERY ENGINES │ │
│ │ │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ Trino │ │ Spark │ │ Flink │ │ │
│ │ │ │ │ │ │ │ │ │
│ │ │ • SQL │ │ • Batch │ │ • Stream │ │ │
│ │ │ • Ad-hoc │ │ • ML │ │ • ETL │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Technology Stack¶
Apache Iceberg (Table Format)¶
Возможности: - Hidden Partitioning: Пользователь не видит партиции в запросах
-- Iceberg автоматически использует партиции
SELECT * FROM sales.orders WHERE created_at >= '2026-01-23'
-
Schema Evolution: Безопасное добавление/удаление колонок
-
Time Travel: Чтение данных на определённый момент времени
-
Partition Evolution: Изменение партиционирования без rewrite
Apache Parquet (File Format)¶
Возможности: - Columnar Storage: Читаем только нужные колонки
- Compression: Column-level compression с различными кодеками
ZSTD level 3— default (balance compression/speed)Dictionary encoding— для enum и low cardinality (status, currency)-
Snappy— для быстрых writes -
Bloom Filters: Ускорение point lookups
-
Statistics: Min/max/null_count для data skipping
Metadata Catalog: Gravitino ⭐¶
Выбор каталога: Iceberg требует каталог метаданных для хранения информации о таблицах.
Рекомендация: Gravitino
Почему Gravitino: - ✅ Unified API — один интерфейс для Iceberg, Paimon, Delta Lake, Hudi - ✅ REST API — стандартный протокол, не нужно разрабатывать свой - ✅ Активное развитие — проект активно развивается сообществом - ✅ Open Source — Apache-лицензия, нет vendor lock-in - ✅ Production-ready — используется в production окружениях
Альтернативы (не рекомендуются): - ❌ Hive Metastore — legacy, не поддерживает современные фичи Iceberg - ❌ AWS Glue — vendor lock-in, дорого при масштабировании - ❌ Nessie — проект заморожен, нет активной поддержки - ⚠️ REST Catalog — можно для MVP, но нет unified API для других форматов
Конфигурация:
iceberg:
catalog:
type: "gravitino"
uri: "http://gravitino-server:8090"
warehouse: "s3://data-lake/warehouse"
database: "sales"
table: "orders"
См. подробнее: Iceberg + Parquet Guide
Data Flow: Kafka → Iceberg¶
┌─────────────────────────────────────────────────────────────────────────────┐
│ KAFKA → ICEBERG INGESTION PIPELINE │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ 1️⃣ KAFKA _prod TOPIC │
│ • Format: Avro │
│ • Retention: 30 days │
│ • Throughput: 100-1000 msg/sec │
│ │
│ ▼ │
│ │
│ 2️⃣ FLINK/SPARK STREAMING JOB │
│ • Читает batch из Kafka │
│ • Преобразует Avro → Iceberg schema │
│ • Применяет partitioning (day(created_at), bucket[16](customer_id)) │
│ • Применяет sort order (customer_id, created_at, order_id) │
│ • Пишет Parquet файлы (target size: 512 MB) │
│ │
│ ▼ │
│ │
│ 3️⃣ S3 DATA LAKE │
│ s3://data-lake/warehouse/sales.db/orders/ │
│ ├── metadata/ │
│ │ ├── v1.metadata.json │
│ │ ├── v2.metadata.json │
│ │ ├── snap-123456789.avro (manifest list) │
│ │ └── 00000-manifest.avro (manifest) │
│ └── data/ │
│ ├── created_at_day=2026-01-23/ │
│ │ ├── customer_id_bucket=0/ │
│ │ │ └── 00000-0-a1b2c3d4.parquet (512 MB) │
│ │ ├── customer_id_bucket=1/ │
│ │ │ └── 00001-0-e5f6g7h8.parquet │
│ │ └── ... │
│ └── created_at_day=2026-01-24/ │
│ │
│ ▼ │
│ │
│ 4️⃣ QUERY ENGINES │
│ • Trino: Ad-hoc SQL analytics │
│ • Spark: Batch processing, ML │
│ • Flink: Streaming ETL │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Integration with Data Contracts¶
physical_layout.yml теперь специфичен для Iceberg/Parquet:
# contracts/domains/sales/orders/physical_layout.yml
iceberg:
format_version: 2
catalog:
type: "gravitino" # Рекомендуется: unified metadata catalog
uri: "http://gravitino-server:8090"
warehouse: "s3://data-lake/warehouse"
database: "sales"
table: "orders"
partitioning:
strategy: "iceberg_hidden"
spec:
- source_column: "created_at"
transform: "day"
partition_field: "created_at_day"
- source_column: "customer_id"
transform: "bucket[16]"
partition_field: "customer_id_bucket"
sort_order:
enabled: true
fields:
- column: "customer_id"
direction: "asc"
- column: "created_at"
direction: "desc"
parquet:
compression:
codec: "zstd"
level: 3
bloom_filters:
- column: "order_id"
fpp: 0.01
- column: "customer_id"
fpp: 0.01
compaction:
small_files:
enabled: true
target_file_size_mb: 512
schedule: "0 2 * * *"
snapshots:
retention:
min_snapshots_to_keep: 10
max_age_days: 30
Benefits¶
| Feature | Traditional DB | Delta Lake | Apache Iceberg |
|---|---|---|---|
| ACID | ✅ | ✅ | ✅ |
| Schema Evolution | ⚠️ Blocking ALTER TABLE | ✅ | ✅ |
| Time Travel | ❌ (backups only) | ✅ | ✅ |
| Partition Evolution | ❌ Требует rewrite | ⚠️ Сложно | ✅ Hidden partitioning |
| Hidden Partitioning | ❌ | ❌ | ✅ |
| Multi-engine Support | ❌ Vendor lock-in | ⚠️ Spark-first | ✅ Trino, Spark, Flink |
| Open Format | ❌ | ⚠️ Databricks ecosystem | ✅ Apache Foundation |
| Row-level Updates | ✅ | ✅ (v2) | ✅ (v2) |
Consequences¶
Positive¶
- ✅ Schema Evolution: Безопасное добавление колонок без downtime
- ✅ Time Travel: Debugging и rollback без backups
- ✅ Partition Evolution: Оптимизация без rewrite
- ✅ Open Format: Vendor-neutral, работает с Trino/Spark/Flink
- ✅ Performance: Columnar format + predicate pushdown + bloom filters
- ✅ Cost Efficiency: ZSTD compression ~4.5x, S3 tiering (hot/warm/cold)
Negative¶
- ⚠️ Learning Curve: Команде нужно изучить Iceberg concepts
- ⚠️ Metadata Overhead: Iceberg добавляет metadata files (manifest, snapshots)
- ⚠️ Compaction Required: Small files нужно регулярно compactить
Operational Considerations¶
Maintenance Tasks:
-- Expire старые snapshots (weekly)
CALL system.expire_snapshots('sales.orders', TIMESTAMP '2025-12-01', 10);
-- Remove orphan files (weekly)
CALL system.remove_orphan_files('sales.orders');
-- Compact small files (daily)
CALL system.rewrite_data_files('sales.orders',
WHERE => 'created_at >= current_date() - 7');
-- Rewrite data для Z-ordering (monthly)
CALL system.rewrite_data_files('sales.orders',
STRATEGY => 'sort',
SORT_ORDER => 'zorder(customer_id, created_at)');
Monitoring:
-- Table size
SELECT SUM(file_size_in_bytes) / 1024^3 AS size_gb
FROM sales.orders.files;
-- Small files count
SELECT COUNT(*) FROM sales.orders.files
WHERE file_size_in_bytes < 128*1024^2;
-- Snapshot age
SELECT NOW() - MAX(committed_at) AS age
FROM sales.orders.snapshots;
-- Query performance (via Trino query logs)
SELECT query_id, query, elapsed_time_ms
FROM system.runtime.queries
WHERE query LIKE '%sales.orders%'
ORDER BY elapsed_time_ms DESC LIMIT 10;
Alternatives Considered¶
| Alternative | Pros | Cons | Decision |
|---|---|---|---|
| Delta Lake | Mature, Databricks support | Spark-first, vendor tie-in | Rejected |
| Apache Hudi | Copy-on-write efficient | Сложная архитектура | Rejected |
| Traditional DB | Simple, ACID | Not scalable, expensive | Rejected |
| Raw Parquet | Simple, cheap | No ACID, no schema evolution | Rejected |
| Apache Iceberg | ✅ Open, multi-engine, hidden partitioning | Learning curve | Selected |
Migration Path¶
Для существующих данных:
# 1. Создать Iceberg таблицу
CREATE TABLE sales.orders (...)
USING iceberg
PARTITIONED BY (day(created_at), bucket(16, customer_id))
LOCATION 's3://data-lake/warehouse/sales.db/orders';
# 2. Импорт из Kafka _prod topic
spark-submit \
--class com.company.KafkaToIceberg \
--conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \
kafka-to-iceberg.jar \
--topic sales.orders_prod \
--table local.sales.orders
# 3. Enable incremental ingestion
flink run \
-c com.company.KafkaIcebergSink \
kafka-iceberg-connector.jar \
--kafka.topic sales.orders_prod \
--iceberg.table sales.orders \
--checkpoint.location s3://checkpoints/sales.orders
Версия: 1.1 Обновлено: 2026-01-23 (добавлен ADR-005) Автор: Data Platform Team