CI/CD Pipeline Guide¶
Обзор¶
CI/CD pipeline автоматически проверяет все изменения контрактов данных перед merge.
┌───────────────────────────────────────────────────────────────────────────┐
│ CI/CD PIPELINE OVERVIEW │
├───────────────────────────────────────────────────────────────────────────┤
│ │
│ Developer creates/updates contract │
│ │ │
│ ▼ │
│ ┌─────────────────┐ │
│ │ Create MR │ │
│ └────────┬────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ PIPELINE STAGES │ │
│ │ │ │
│ │ Stage 1 Stage 2 Stage 3 Stage 4 │ │
│ │ ┌──────────┐ ┌──────────────┐ ┌──────────────┐ ┌────────────┐ │ │
│ │ │ Validate │───▶│ Breaking │─▶│ Version │▶│ Deploy │ │ │
│ │ │ Syntax │ │ Changes │ │ Check │ │ (on merge) │ │ │
│ │ └──────────┘ └──────────────┘ └──────────────┘ └────────────┘ │ │
│ │ │ │ │ │ │ │
│ │ ▼ ▼ ▼ ▼ │ │
│ │ ✓ YAML valid ✓ Compatible ✓ Version OK ✓ Schema Reg │ │
│ │ ✓ Schema OK ✗ Breaking ✗ Bump needed ✓ Catalog │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────┐ │
│ │ MR Status │ │
│ │ ✓ Ready / ✗ │ │
│ └─────────────────┘ │
│ │
└───────────────────────────────────────────────────────────────────────────┘
GitLab CI Configuration¶
.gitlab-ci.yml¶
# .gitlab-ci.yml
stages:
- validate
- analyze
- version
- deploy
variables:
PYTHON_VERSION: "3.11"
SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
# ═══════════════════════════════════════════════════════════════════════════
# STAGE 1: VALIDATE
# ═══════════════════════════════════════════════════════════════════════════
validate:syntax:
stage: validate
image: python:${PYTHON_VERSION}-slim
script:
- pip install -r requirements.txt
- python ci/validate_contract.py --all
rules:
- changes:
- "domains/**/*.yaml"
- "domains/**/*.yml"
validate:schema:
stage: validate
image: python:${PYTHON_VERSION}-slim
script:
- pip install -r requirements.txt
- python ci/validate_against_schema.py --all
rules:
- changes:
- "domains/**/*.yaml"
- "domains/**/*.yml"
validate:quality-rules:
stage: validate
image: python:${PYTHON_VERSION}-slim
script:
- pip install -r requirements.txt
- python ci/validate_quality_rules.py --all
rules:
- changes:
- "domains/**/quality_rules.yml"
# ═══════════════════════════════════════════════════════════════════════════
# STAGE 2: ANALYZE
# ═══════════════════════════════════════════════════════════════════════════
analyze:breaking-changes:
stage: analyze
image: python:${PYTHON_VERSION}-slim
script:
- pip install -r requirements.txt
- |
python ci/detect_breaking_changes.py \
--base-ref origin/${CI_MERGE_REQUEST_TARGET_BRANCH_NAME} \
--head-ref HEAD \
--output report.json
artifacts:
reports:
dotenv: breaking_changes.env
paths:
- report.json
expire_in: 1 week
rules:
- if: '$CI_PIPELINE_SOURCE == "merge_request_event"'
changes:
- "domains/**/*.yaml"
analyze:impact:
stage: analyze
image: python:${PYTHON_VERSION}-slim
script:
- pip install -r requirements.txt
- python ci/analyze_impact.py --changed-contracts
rules:
- if: '$CI_PIPELINE_SOURCE == "merge_request_event"'
changes:
- "domains/**/*.yaml"
# ═══════════════════════════════════════════════════════════════════════════
# STAGE 3: VERSION
# ═══════════════════════════════════════════════════════════════════════════
version:check:
stage: version
image: python:${PYTHON_VERSION}-slim
script:
- pip install -r requirements.txt
- python ci/check_version_bump.py --changed-contracts
rules:
- if: '$CI_PIPELINE_SOURCE == "merge_request_event"'
changes:
- "domains/**/*.yaml"
version:suggest:
stage: version
image: python:${PYTHON_VERSION}-slim
script:
- pip install -r requirements.txt
- |
python ci/suggest_version.py \
--base-ref origin/${CI_MERGE_REQUEST_TARGET_BRANCH_NAME} \
--output suggested_version.txt
- cat suggested_version.txt
artifacts:
paths:
- suggested_version.txt
expire_in: 1 day
rules:
- if: '$CI_PIPELINE_SOURCE == "merge_request_event"'
changes:
- "domains/**/*.yaml"
allow_failure: true
# ═══════════════════════════════════════════════════════════════════════════
# STAGE 4: DEPLOY
# ═══════════════════════════════════════════════════════════════════════════
deploy:schema-registry:
stage: deploy
image: python:${PYTHON_VERSION}-slim
script:
- pip install -r requirements.txt
- python ci/deploy_to_registry.py --all
rules:
- if: '$CI_COMMIT_BRANCH == "main"'
changes:
- "domains/**/*.yaml"
environment:
name: production
url: $SCHEMA_REGISTRY_URL
deploy:catalog:
stage: deploy
image: python:${PYTHON_VERSION}-slim
script:
- pip install -r requirements.txt
- python ci/sync_to_catalog.py --all
rules:
- if: '$CI_COMMIT_BRANCH == "main"'
changes:
- "domains/**/*.yaml"
- "domains/**/*.yml"
allow_failure: true
deploy:notify:
stage: deploy
image: curlimages/curl:latest
script:
- |
curl -X POST $SLACK_WEBHOOK_URL \
-H 'Content-type: application/json' \
-d '{
"text": "📋 Data Contracts updated",
"blocks": [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "New contract versions deployed to production.\n*Commit:* '"${CI_COMMIT_SHORT_SHA}"'\n*Author:* '"${CI_COMMIT_AUTHOR}"'"
}
}
]
}'
rules:
- if: '$CI_COMMIT_BRANCH == "main"'
changes:
- "domains/**/*.yaml"
Pipeline Stages Detailed¶
Stage 1: Validate¶
validate:syntax¶
Проверяет синтаксис YAML файлов.
# ci/validate_contract.py
import yaml
import sys
from pathlib import Path
def validate_yaml_syntax(file_path: Path) -> bool:
"""Validate YAML syntax"""
try:
with open(file_path) as f:
yaml.safe_load(f)
return True
except yaml.YAMLError as e:
print(f"❌ {file_path}: {e}")
return False
def main():
contracts = list(Path("domains").rglob("contract.yaml"))
errors = []
for contract in contracts:
if not validate_yaml_syntax(contract):
errors.append(contract)
if errors:
print(f"\n❌ {len(errors)} contracts have syntax errors")
sys.exit(1)
print(f"✓ All {len(contracts)} contracts have valid syntax")
if __name__ == "__main__":
main()
validate:schema¶
Проверяет соответствие JSON Schema.
# ci/validate_against_schema.py
import json
import yaml
from jsonschema import validate, ValidationError
from pathlib import Path
def load_contract_schema() -> dict:
"""Load JSON Schema for contracts"""
with open("schemas/contract-schema.json") as f:
return json.load(f)
def validate_contract(contract_path: Path, schema: dict) -> list:
"""Validate contract against schema"""
errors = []
with open(contract_path) as f:
contract = yaml.safe_load(f)
try:
validate(instance=contract, schema=schema)
except ValidationError as e:
errors.append(f"{contract_path}: {e.message}")
return errors
def main():
schema = load_contract_schema()
contracts = list(Path("domains").rglob("contract.yaml"))
all_errors = []
for contract in contracts:
errors = validate_contract(contract, schema)
all_errors.extend(errors)
if all_errors:
for error in all_errors:
print(f"❌ {error}")
sys.exit(1)
print(f"✓ All {len(contracts)} contracts conform to schema")
Stage 2: Analyze¶
analyze:breaking-changes¶
Определяет breaking changes между версиями.
# ci/detect_breaking_changes.py
import yaml
import subprocess
from enum import Enum
from pathlib import Path
from pydantic import BaseModel
class ChangeType(str, Enum):
BREAKING = "breaking"
NON_BREAKING = "non_breaking"
PATCH = "patch"
class Change(BaseModel):
"""Represents a change detected in contract."""
contract: str
field: str
change_type: ChangeType
description: str
model_config = {
"use_enum_values": True,
}
def get_file_at_ref(file_path: str, ref: str) -> str:
"""Get file content at specific git ref"""
result = subprocess.run(
["git", "show", f"{ref}:{file_path}"],
capture_output=True,
text=True
)
return result.stdout if result.returncode == 0 else None
def detect_breaking_changes(old_contract: dict, new_contract: dict) -> list[Change]:
"""Detect breaking changes between contract versions"""
changes = []
old_fields = {f['name']: f for f in old_contract.get('schema', {}).get('fields', [])}
new_fields = {f['name']: f for f in new_contract.get('schema', {}).get('fields', [])}
# Removed fields = BREAKING
for field_name in old_fields:
if field_name not in new_fields:
changes.append(Change(
contract=old_contract['metadata']['name'],
field=field_name,
change_type=ChangeType.BREAKING,
description=f"Field '{field_name}' was removed"
))
# Type changes = BREAKING
for field_name, new_field in new_fields.items():
if field_name in old_fields:
old_field = old_fields[field_name]
if old_field.get('type') != new_field.get('type'):
changes.append(Change(
contract=old_contract['metadata']['name'],
field=field_name,
change_type=ChangeType.BREAKING,
description=f"Field '{field_name}' type changed from {old_field.get('type')} to {new_field.get('type')}"
))
# Required: false -> true = BREAKING
if not old_field.get('required') and new_field.get('required'):
changes.append(Change(
contract=old_contract['metadata']['name'],
field=field_name,
change_type=ChangeType.BREAKING,
description=f"Field '{field_name}' became required"
))
# New nullable field = NON_BREAKING
for field_name, new_field in new_fields.items():
if field_name not in old_fields:
if new_field.get('required', False):
changes.append(Change(
contract=old_contract['metadata']['name'],
field=field_name,
change_type=ChangeType.BREAKING,
description=f"New required field '{field_name}' added"
))
else:
changes.append(Change(
contract=old_contract['metadata']['name'],
field=field_name,
change_type=ChangeType.NON_BREAKING,
description=f"New optional field '{field_name}' added"
))
return changes
def main():
# Parse arguments and run detection
# Output results as JSON
pass
Stage 3: Version¶
version:check¶
Проверяет корректность версионирования.
# ci/check_version_bump.py
import yaml
from packaging import version
from pathlib import Path
def check_version_bump(old_contract: dict, new_contract: dict, changes: list) -> tuple[bool, str]:
"""Check if version bump is correct for changes"""
old_version = version.parse(old_contract.get('contract_version', '0.0.0'))
new_version = version.parse(new_contract.get('contract_version', '0.0.0'))
has_breaking = any(c.change_type == ChangeType.BREAKING for c in changes)
has_non_breaking = any(c.change_type == ChangeType.NON_BREAKING for c in changes)
if has_breaking:
# Must bump MAJOR
if new_version.major <= old_version.major:
return False, f"Breaking changes require MAJOR version bump: {old_version} -> {new_version.major + 1}.0.0"
elif has_non_breaking:
# Must bump at least MINOR
if new_version.minor <= old_version.minor and new_version.major == old_version.major:
return False, f"Non-breaking changes require MINOR version bump: {old_version} -> {old_version.major}.{old_version.minor + 1}.0"
else:
# PATCH for description changes
if new_version <= old_version:
return False, f"Changes require at least PATCH version bump"
return True, f"Version bump OK: {old_version} -> {new_version}"
Stage 4: Deploy¶
deploy:schema-registry¶
Публикует схемы в Schema Registry.
# ci/deploy_to_registry.py
import requests
import yaml
import json
from pathlib import Path
SCHEMA_REGISTRY_URL = os.environ.get('SCHEMA_REGISTRY_URL', 'http://localhost:8081')
def contract_to_avro_schema(contract: dict) -> dict:
"""Convert contract schema to Avro schema"""
fields = []
for field in contract['schema']['fields']:
avro_field = {
"name": field['name'],
"type": map_type_to_avro(field['type'], field.get('required', True))
}
if field.get('default') is not None:
avro_field['default'] = field['default']
fields.append(avro_field)
return {
"type": "record",
"name": contract['metadata']['name'],
"namespace": contract['metadata']['namespace'],
"fields": fields
}
def deploy_schema(contract_path: Path):
"""Deploy contract schema to Schema Registry"""
with open(contract_path) as f:
contract = yaml.safe_load(f)
avro_schema = contract_to_avro_schema(contract)
subject = f"{contract['metadata']['namespace']}.{contract['metadata']['name']}-value"
response = requests.post(
f"{SCHEMA_REGISTRY_URL}/subjects/{subject}/versions",
headers={"Content-Type": "application/vnd.schemaregistry.v1+json"},
json={"schema": json.dumps(avro_schema)}
)
if response.status_code in [200, 201]:
print(f"✓ Deployed {subject} version {response.json().get('id')}")
else:
print(f"❌ Failed to deploy {subject}: {response.text}")
return False
return True
MR Labels & Automation¶
Автоматические Labels¶
# .gitlab/merge_request_templates/contract.md
## Contract Change
**Contract:** <!-- e.g., sales/orders -->
**Change Type:** <!-- breaking / non-breaking / patch -->
### Changes
- [ ] Schema changes
- [ ] Quality rules changes
- [ ] SLA changes
- [ ] Metadata changes
### Checklist
- [ ] Version bumped appropriately
- [ ] Consumers notified (for breaking changes)
- [ ] Runbook updated (if needed)
- [ ] Tests passed
/label ~"data-contract" ~"needs-review"
Bot Comments¶
Pipeline автоматически добавляет комментарий к MR:
## 🤖 Contract Analysis Report
### Changes Detected
| Contract | Change Type | Details |
|----------|-------------|---------|
| sales/orders | 🟡 Non-breaking | Added field: `discount` |
| sales/orders | 🟡 Non-breaking | Added field: `promo_code` |
### Version Recommendation
Current: `1.2.0` → Suggested: `1.3.0` (MINOR bump)
### Affected Consumers
| Consumer | Criticality | Contact |
|----------|-------------|---------|
| analytics_team | High | @analytics |
| ml_team | Medium | @ml |
---
*This comment is auto-generated by CI pipeline*
Troubleshooting¶
Pipeline Fails on validate:syntax¶
# Run locally
python ci/validate_contract.py domains/sales/orders/contract.yaml
# Common issues:
# - Incorrect indentation
# - Missing quotes around special characters
# - Invalid YAML syntax
Pipeline Fails on analyze:breaking-changes¶
# View the breaking changes report
cat report.json | jq '.'
# If breaking changes are intentional:
# 1. Bump MAJOR version
# 2. Add /label ~"breaking-change" to MR
# 3. Get explicit approval from all consumers
Pipeline Fails on version:check¶
# Check current and required version
python ci/suggest_version.py --contract domains/sales/orders/contract.yaml
# Fix: Update contract_version in contract.yaml
Версия: 1.0 Последнее обновление: 24 января 2026