Перейти к содержанию

CI/CD Pipeline Guide

Обзор

CI/CD pipeline автоматически проверяет все изменения контрактов данных перед merge.

┌───────────────────────────────────────────────────────────────────────────┐
│                        CI/CD PIPELINE OVERVIEW                             │
├───────────────────────────────────────────────────────────────────────────┤
│                                                                           │
│  Developer creates/updates contract                                       │
│           │                                                               │
│           ▼                                                               │
│  ┌─────────────────┐                                                      │
│  │  Create MR      │                                                      │
│  └────────┬────────┘                                                      │
│           │                                                               │
│           ▼                                                               │
│  ┌─────────────────────────────────────────────────────────────────────┐  │
│  │                      PIPELINE STAGES                                │  │
│  │                                                                     │  │
│  │  Stage 1          Stage 2           Stage 3          Stage 4       │  │
│  │  ┌──────────┐    ┌──────────────┐  ┌──────────────┐ ┌────────────┐ │  │
│  │  │ Validate │───▶│ Breaking     │─▶│ Version      │▶│ Deploy     │ │  │
│  │  │ Syntax   │    │ Changes      │  │ Check        │ │ (on merge) │ │  │
│  │  └──────────┘    └──────────────┘  └──────────────┘ └────────────┘ │  │
│  │       │                │                 │               │         │  │
│  │       ▼                ▼                 ▼               ▼         │  │
│  │   ✓ YAML valid    ✓ Compatible     ✓ Version OK   ✓ Schema Reg   │  │
│  │   ✓ Schema OK     ✗ Breaking       ✗ Bump needed  ✓ Catalog      │  │
│  │                                                                     │  │
│  └─────────────────────────────────────────────────────────────────────┘  │
│           │                                                               │
│           ▼                                                               │
│  ┌─────────────────┐                                                      │
│  │  MR Status      │                                                      │
│  │  ✓ Ready / ✗    │                                                      │
│  └─────────────────┘                                                      │
│                                                                           │
└───────────────────────────────────────────────────────────────────────────┘

GitLab CI Configuration

.gitlab-ci.yml

# .gitlab-ci.yml

stages:
  - validate
  - analyze
  - version
  - deploy

variables:
  PYTHON_VERSION: "3.11"
  SCHEMA_REGISTRY_URL: "http://schema-registry:8081"

# ═══════════════════════════════════════════════════════════════════════════
# STAGE 1: VALIDATE
# ═══════════════════════════════════════════════════════════════════════════

validate:syntax:
  stage: validate
  image: python:${PYTHON_VERSION}-slim
  script:
    - pip install -r requirements.txt
    - python ci/validate_contract.py --all
  rules:
    - changes:
        - "domains/**/*.yaml"
        - "domains/**/*.yml"

validate:schema:
  stage: validate
  image: python:${PYTHON_VERSION}-slim
  script:
    - pip install -r requirements.txt
    - python ci/validate_against_schema.py --all
  rules:
    - changes:
        - "domains/**/*.yaml"
        - "domains/**/*.yml"

validate:quality-rules:
  stage: validate
  image: python:${PYTHON_VERSION}-slim
  script:
    - pip install -r requirements.txt
    - python ci/validate_quality_rules.py --all
  rules:
    - changes:
        - "domains/**/quality_rules.yml"

# ═══════════════════════════════════════════════════════════════════════════
# STAGE 2: ANALYZE
# ═══════════════════════════════════════════════════════════════════════════

analyze:breaking-changes:
  stage: analyze
  image: python:${PYTHON_VERSION}-slim
  script:
    - pip install -r requirements.txt
    - |
      python ci/detect_breaking_changes.py \
        --base-ref origin/${CI_MERGE_REQUEST_TARGET_BRANCH_NAME} \
        --head-ref HEAD \
        --output report.json
  artifacts:
    reports:
      dotenv: breaking_changes.env
    paths:
      - report.json
    expire_in: 1 week
  rules:
    - if: '$CI_PIPELINE_SOURCE == "merge_request_event"'
      changes:
        - "domains/**/*.yaml"

analyze:impact:
  stage: analyze
  image: python:${PYTHON_VERSION}-slim
  script:
    - pip install -r requirements.txt
    - python ci/analyze_impact.py --changed-contracts
  rules:
    - if: '$CI_PIPELINE_SOURCE == "merge_request_event"'
      changes:
        - "domains/**/*.yaml"

# ═══════════════════════════════════════════════════════════════════════════
# STAGE 3: VERSION
# ═══════════════════════════════════════════════════════════════════════════

version:check:
  stage: version
  image: python:${PYTHON_VERSION}-slim
  script:
    - pip install -r requirements.txt
    - python ci/check_version_bump.py --changed-contracts
  rules:
    - if: '$CI_PIPELINE_SOURCE == "merge_request_event"'
      changes:
        - "domains/**/*.yaml"

version:suggest:
  stage: version
  image: python:${PYTHON_VERSION}-slim
  script:
    - pip install -r requirements.txt
    - |
      python ci/suggest_version.py \
        --base-ref origin/${CI_MERGE_REQUEST_TARGET_BRANCH_NAME} \
        --output suggested_version.txt
    - cat suggested_version.txt
  artifacts:
    paths:
      - suggested_version.txt
    expire_in: 1 day
  rules:
    - if: '$CI_PIPELINE_SOURCE == "merge_request_event"'
      changes:
        - "domains/**/*.yaml"
  allow_failure: true

# ═══════════════════════════════════════════════════════════════════════════
# STAGE 4: DEPLOY
# ═══════════════════════════════════════════════════════════════════════════

deploy:schema-registry:
  stage: deploy
  image: python:${PYTHON_VERSION}-slim
  script:
    - pip install -r requirements.txt
    - python ci/deploy_to_registry.py --all
  rules:
    - if: '$CI_COMMIT_BRANCH == "main"'
      changes:
        - "domains/**/*.yaml"
  environment:
    name: production
    url: $SCHEMA_REGISTRY_URL

deploy:catalog:
  stage: deploy
  image: python:${PYTHON_VERSION}-slim
  script:
    - pip install -r requirements.txt
    - python ci/sync_to_catalog.py --all
  rules:
    - if: '$CI_COMMIT_BRANCH == "main"'
      changes:
        - "domains/**/*.yaml"
        - "domains/**/*.yml"
  allow_failure: true

deploy:notify:
  stage: deploy
  image: curlimages/curl:latest
  script:
    - |
      curl -X POST $SLACK_WEBHOOK_URL \
        -H 'Content-type: application/json' \
        -d '{
          "text": "📋 Data Contracts updated",
          "blocks": [
            {
              "type": "section",
              "text": {
                "type": "mrkdwn",
                "text": "New contract versions deployed to production.\n*Commit:* '"${CI_COMMIT_SHORT_SHA}"'\n*Author:* '"${CI_COMMIT_AUTHOR}"'"
              }
            }
          ]
        }'
  rules:
    - if: '$CI_COMMIT_BRANCH == "main"'
      changes:
        - "domains/**/*.yaml"

Pipeline Stages Detailed

Stage 1: Validate

validate:syntax

Проверяет синтаксис YAML файлов.

# ci/validate_contract.py
import yaml
import sys
from pathlib import Path

def validate_yaml_syntax(file_path: Path) -> bool:
    """Validate YAML syntax"""
    try:
        with open(file_path) as f:
            yaml.safe_load(f)
        return True
    except yaml.YAMLError as e:
        print(f"❌ {file_path}: {e}")
        return False

def main():
    contracts = list(Path("domains").rglob("contract.yaml"))

    errors = []
    for contract in contracts:
        if not validate_yaml_syntax(contract):
            errors.append(contract)

    if errors:
        print(f"\n{len(errors)} contracts have syntax errors")
        sys.exit(1)

    print(f"✓ All {len(contracts)} contracts have valid syntax")

if __name__ == "__main__":
    main()

validate:schema

Проверяет соответствие JSON Schema.

# ci/validate_against_schema.py
import json
import yaml
from jsonschema import validate, ValidationError
from pathlib import Path

def load_contract_schema() -> dict:
    """Load JSON Schema for contracts"""
    with open("schemas/contract-schema.json") as f:
        return json.load(f)

def validate_contract(contract_path: Path, schema: dict) -> list:
    """Validate contract against schema"""
    errors = []

    with open(contract_path) as f:
        contract = yaml.safe_load(f)

    try:
        validate(instance=contract, schema=schema)
    except ValidationError as e:
        errors.append(f"{contract_path}: {e.message}")

    return errors

def main():
    schema = load_contract_schema()
    contracts = list(Path("domains").rglob("contract.yaml"))

    all_errors = []
    for contract in contracts:
        errors = validate_contract(contract, schema)
        all_errors.extend(errors)

    if all_errors:
        for error in all_errors:
            print(f"❌ {error}")
        sys.exit(1)

    print(f"✓ All {len(contracts)} contracts conform to schema")

Stage 2: Analyze

analyze:breaking-changes

Определяет breaking changes между версиями.

# ci/detect_breaking_changes.py
import yaml
import subprocess
from enum import Enum
from pathlib import Path
from pydantic import BaseModel

class ChangeType(str, Enum):
    BREAKING = "breaking"
    NON_BREAKING = "non_breaking"
    PATCH = "patch"

class Change(BaseModel):
    """Represents a change detected in contract."""
    contract: str
    field: str
    change_type: ChangeType
    description: str

    model_config = {
        "use_enum_values": True,
    }

def get_file_at_ref(file_path: str, ref: str) -> str:
    """Get file content at specific git ref"""
    result = subprocess.run(
        ["git", "show", f"{ref}:{file_path}"],
        capture_output=True,
        text=True
    )
    return result.stdout if result.returncode == 0 else None

def detect_breaking_changes(old_contract: dict, new_contract: dict) -> list[Change]:
    """Detect breaking changes between contract versions"""
    changes = []

    old_fields = {f['name']: f for f in old_contract.get('schema', {}).get('fields', [])}
    new_fields = {f['name']: f for f in new_contract.get('schema', {}).get('fields', [])}

    # Removed fields = BREAKING
    for field_name in old_fields:
        if field_name not in new_fields:
            changes.append(Change(
                contract=old_contract['metadata']['name'],
                field=field_name,
                change_type=ChangeType.BREAKING,
                description=f"Field '{field_name}' was removed"
            ))

    # Type changes = BREAKING
    for field_name, new_field in new_fields.items():
        if field_name in old_fields:
            old_field = old_fields[field_name]
            if old_field.get('type') != new_field.get('type'):
                changes.append(Change(
                    contract=old_contract['metadata']['name'],
                    field=field_name,
                    change_type=ChangeType.BREAKING,
                    description=f"Field '{field_name}' type changed from {old_field.get('type')} to {new_field.get('type')}"
                ))

            # Required: false -> true = BREAKING
            if not old_field.get('required') and new_field.get('required'):
                changes.append(Change(
                    contract=old_contract['metadata']['name'],
                    field=field_name,
                    change_type=ChangeType.BREAKING,
                    description=f"Field '{field_name}' became required"
                ))

    # New nullable field = NON_BREAKING
    for field_name, new_field in new_fields.items():
        if field_name not in old_fields:
            if new_field.get('required', False):
                changes.append(Change(
                    contract=old_contract['metadata']['name'],
                    field=field_name,
                    change_type=ChangeType.BREAKING,
                    description=f"New required field '{field_name}' added"
                ))
            else:
                changes.append(Change(
                    contract=old_contract['metadata']['name'],
                    field=field_name,
                    change_type=ChangeType.NON_BREAKING,
                    description=f"New optional field '{field_name}' added"
                ))

    return changes

def main():
    # Parse arguments and run detection
    # Output results as JSON
    pass

Stage 3: Version

version:check

Проверяет корректность версионирования.

# ci/check_version_bump.py
import yaml
from packaging import version
from pathlib import Path

def check_version_bump(old_contract: dict, new_contract: dict, changes: list) -> tuple[bool, str]:
    """Check if version bump is correct for changes"""

    old_version = version.parse(old_contract.get('contract_version', '0.0.0'))
    new_version = version.parse(new_contract.get('contract_version', '0.0.0'))

    has_breaking = any(c.change_type == ChangeType.BREAKING for c in changes)
    has_non_breaking = any(c.change_type == ChangeType.NON_BREAKING for c in changes)

    if has_breaking:
        # Must bump MAJOR
        if new_version.major <= old_version.major:
            return False, f"Breaking changes require MAJOR version bump: {old_version} -> {new_version.major + 1}.0.0"
    elif has_non_breaking:
        # Must bump at least MINOR
        if new_version.minor <= old_version.minor and new_version.major == old_version.major:
            return False, f"Non-breaking changes require MINOR version bump: {old_version} -> {old_version.major}.{old_version.minor + 1}.0"
    else:
        # PATCH for description changes
        if new_version <= old_version:
            return False, f"Changes require at least PATCH version bump"

    return True, f"Version bump OK: {old_version} -> {new_version}"

Stage 4: Deploy

deploy:schema-registry

Публикует схемы в Schema Registry.

# ci/deploy_to_registry.py
import requests
import yaml
import json
from pathlib import Path

SCHEMA_REGISTRY_URL = os.environ.get('SCHEMA_REGISTRY_URL', 'http://localhost:8081')

def contract_to_avro_schema(contract: dict) -> dict:
    """Convert contract schema to Avro schema"""
    fields = []
    for field in contract['schema']['fields']:
        avro_field = {
            "name": field['name'],
            "type": map_type_to_avro(field['type'], field.get('required', True))
        }
        if field.get('default') is not None:
            avro_field['default'] = field['default']
        fields.append(avro_field)

    return {
        "type": "record",
        "name": contract['metadata']['name'],
        "namespace": contract['metadata']['namespace'],
        "fields": fields
    }

def deploy_schema(contract_path: Path):
    """Deploy contract schema to Schema Registry"""
    with open(contract_path) as f:
        contract = yaml.safe_load(f)

    avro_schema = contract_to_avro_schema(contract)
    subject = f"{contract['metadata']['namespace']}.{contract['metadata']['name']}-value"

    response = requests.post(
        f"{SCHEMA_REGISTRY_URL}/subjects/{subject}/versions",
        headers={"Content-Type": "application/vnd.schemaregistry.v1+json"},
        json={"schema": json.dumps(avro_schema)}
    )

    if response.status_code in [200, 201]:
        print(f"✓ Deployed {subject} version {response.json().get('id')}")
    else:
        print(f"❌ Failed to deploy {subject}: {response.text}")
        return False

    return True

MR Labels & Automation

Автоматические Labels

# .gitlab/merge_request_templates/contract.md
## Contract Change

**Contract:** <!-- e.g., sales/orders -->
**Change Type:** <!-- breaking / non-breaking / patch -->

### Changes
- [ ] Schema changes
- [ ] Quality rules changes
- [ ] SLA changes
- [ ] Metadata changes

### Checklist
- [ ] Version bumped appropriately
- [ ] Consumers notified (for breaking changes)
- [ ] Runbook updated (if needed)
- [ ] Tests passed

/label ~"data-contract" ~"needs-review"

Bot Comments

Pipeline автоматически добавляет комментарий к MR:

## 🤖 Contract Analysis Report

### Changes Detected

| Contract | Change Type | Details |
|----------|-------------|---------|
| sales/orders | 🟡 Non-breaking | Added field: `discount` |
| sales/orders | 🟡 Non-breaking | Added field: `promo_code` |

### Version Recommendation

Current: `1.2.0` → Suggested: `1.3.0` (MINOR bump)

### Affected Consumers

| Consumer | Criticality | Contact |
|----------|-------------|---------|
| analytics_team | High | @analytics |
| ml_team | Medium | @ml |

---
*This comment is auto-generated by CI pipeline*

Troubleshooting

Pipeline Fails on validate:syntax

# Run locally
python ci/validate_contract.py domains/sales/orders/contract.yaml

# Common issues:
# - Incorrect indentation
# - Missing quotes around special characters
# - Invalid YAML syntax

Pipeline Fails on analyze:breaking-changes

# View the breaking changes report
cat report.json | jq '.'

# If breaking changes are intentional:
# 1. Bump MAJOR version
# 2. Add /label ~"breaking-change" to MR
# 3. Get explicit approval from all consumers

Pipeline Fails on version:check

# Check current and required version
python ci/suggest_version.py --contract domains/sales/orders/contract.yaml

# Fix: Update contract_version in contract.yaml

Версия: 1.0 Последнее обновление: 24 января 2026