Перейти к содержанию

Управление mTLS сертификатами

Обзор

API Gateway использует mutual TLS (mTLS) для аутентификации издателей данных.

Принцип: Один сертификат = Один топик = Одна команда

Это обеспечивает: - ✅ Изоляцию между командами - ✅ Минимальные привилегии (least privilege) - ✅ Простой аудит (кто отправил данные) - ✅ Легкую ротацию при компрометации

Схема именования CN (Common Name)

CN = {namespace}.{entity}

Примеры:
  sales.orders
  warehouse.inventory
  marketing.leads

Важно: CN точно соответствует имени топика в Kafka (без суффиксов .raw, _prod, _dlq).

Процесс выдачи сертификата

1. Запрос от команды

Команда создает тикет в JIRA с информацией:

Тип: Certificate Request
Контракт: sales/orders
Команда: sales-integration
Владелец: ivan.petrov@company.ru
Окружение: production
Срок действия: 1 год

2. Валидация запроса

Data Platform team проверяет:

  • Контракт существует в GitLab /contracts
  • Владелец указан в contract.yaml
  • Email совпадает с owner.email в контракте
  • Нет активного сертификата для этого топика

3. Генерация сертификата

#!/bin/bash
# generate_client_cert.sh

NAMESPACE="sales"
ENTITY="orders"
CN="${NAMESPACE}.${ENTITY}"
VALIDITY_DAYS=365
TEAM="sales-integration"

# Создаем директорию для сертификатов команды
CERT_DIR="./certs/${TEAM}"
mkdir -p "${CERT_DIR}"

# Генерируем приватный ключ
openssl genrsa -out "${CERT_DIR}/${CN}.key" 4096

# Создаем CSR (Certificate Signing Request)
openssl req -new \
  -key "${CERT_DIR}/${CN}.key" \
  -out "${CERT_DIR}/${CN}.csr" \
  -subj "/C=RU/ST=Moscow/L=Moscow/O=Company/OU=${TEAM}/CN=${CN}"

# Подписываем сертификат CA
openssl x509 -req \
  -in "${CERT_DIR}/${CN}.csr" \
  -CA ./ca/ca.crt \
  -CAkey ./ca/ca.key \
  -CAcreateserial \
  -out "${CERT_DIR}/${CN}.crt" \
  -days ${VALIDITY_DAYS} \
  -sha256

# Создаем bundle (cert + key)
cat "${CERT_DIR}/${CN}.crt" "${CERT_DIR}/${CN}.key" > "${CERT_DIR}/${CN}.pem"

echo "✅ Certificate generated:"
echo "   CN: ${CN}"
echo "   Files:"
echo "     - ${CERT_DIR}/${CN}.crt (certificate)"
echo "     - ${CERT_DIR}/${CN}.key (private key)"
echo "     - ${CERT_DIR}/${CN}.pem (bundle)"
echo ""
echo "Valid until:"
openssl x509 -in "${CERT_DIR}/${CN}.crt" -noout -enddate

# Сохраняем метаданные
cat > "${CERT_DIR}/${CN}.metadata.json" <<EOF
{
  "cn": "${CN}",
  "team": "${TEAM}",
  "created_at": "$(date -u +%Y-%m-%dT%H:%M:%SZ)",
  "expires_at": "$(openssl x509 -in ${CERT_DIR}/${CN}.crt -noout -enddate | cut -d= -f2)",
  "issued_by": "$(whoami)",
  "contract_path": "domains/${NAMESPACE}/${ENTITY}/contract.yaml"
}
EOF

4. Регистрация в реестре

# Добавляем в реестр сертификатов
cat >> ./certs/registry.csv <<EOF
${CN},${TEAM},$(date +%Y-%m-%d),$(date -d "+${VALIDITY_DAYS} days" +%Y-%m-%d),active
EOF

Формат registry.csv:

cn,team,issued_date,expires_date,status
sales.orders,sales-integration,2026-01-23,2027-01-23,active
warehouse.inventory,warehouse-team,2026-01-20,2027-01-20,active
marketing.leads,marketing-tech,2026-01-15,2027-01-15,active

5. Передача сертификата команде

Безопасная передача:

  1. Зашифровать приватный ключ паролем:

    openssl rsa -aes256 \
      -in "${CERT_DIR}/${CN}.key" \
      -out "${CERT_DIR}/${CN}.key.encrypted"
    

  2. Передать пароль через 1Password/Vault

  3. Отправить зашифрованный ключ + сертификат через защищенный канал

  4. Никогда не отправлять по email!

6. Документирование

Обновить docs/certificates.md:

## Активные сертификаты

| CN | Команда | Expires | Contact |
|----|---------|---------|---------|
| sales.orders | sales-integration | 2027-01-23 | ivan@company.ru |
| warehouse.inventory | warehouse-team | 2027-01-20 | maria@company.ru |

Установка сертификата в приложении

Python пример

import requests

# Путь к сертификатам
CERT_FILE = "/secure/certs/sales.orders.crt"
KEY_FILE = "/secure/certs/sales.orders.key"
CA_FILE = "/secure/certs/ca.crt"

# Отправка данных
response = requests.post(
    "https://data-gateway.company.ru/api/1.0/sales.orders",
    data=avro_payload,
    headers={
        "Content-Type": "application/avro",
        "version": "2.1.0",
        "batch": "100",
    },
    cert=(CERT_FILE, KEY_FILE),
    verify=CA_FILE,
)

Batch Producer (читает из БД)

import requests

# Batch producer читает данные из БД (PostgreSQL/MSSQL)
# и отправляет в API Gateway
# 
# НЕ отправляйте напрямую из приложения (1C, Mindbox)!

response = requests.post(
    "https://data-gateway.company.ru/api/1.0/sales.orders",
    data=avro_payload,  # Avro bytes из БД
    headers={
        "Content-Type": "application/avro",
        "version": "2.1.0",
        "batch": "100",
    },
    cert=(
        "/secure/certs/sales.orders.crt",
        "/secure/certs/sales.orders.key"
    ),
    verify="/secure/certs/ca.crt",
    timeout=30
)

assert response.status_code == 201  # Created

Полный пример: examples/1c/batch_producer.py

API Gateway: Проверка сертификата

from flask import Flask, request, Response
import logging

app = Flask(__name__)
log = logging.getLogger(__name__)

def get_client_cn(request) -> str:
    """Извлечь CN из client certificate."""
    # Nginx передает client cert info через заголовки
    # ssl_client_s_dn: CN=sales.orders,OU=sales-integration,O=Company

    client_dn = request.headers.get("X-SSL-Client-S-DN", "")

    if not client_dn:
        # Альтернативно: прямой доступ через request.environ (если Flask видит SSL)
        client_dn = request.environ.get("SSL_CLIENT_S_DN", "")

    # Парсим CN из DN
    for part in client_dn.split(","):
        if part.strip().startswith("CN="):
            return part.split("=", 1)[1].strip()

    return None

def verify_cert_not_revoked(cn: str) -> bool:
    """Проверка, что сертификат не отозван."""
    # Проверка в CRL (Certificate Revocation List) или OCSP
    # Для простоты - проверка в локальном файле
    try:
        with open("/etc/data-gateway/revoked.txt", "r") as f:
            revoked = [line.strip() for line in f]
            return cn not in revoked
    except FileNotFoundError:
        return True

@app.post("/api/1.0/<topic>")
def ingest(topic: str):
    """
    Принять данные от producer.

    Проверки (только аутентификация и метаданные!):
    1. Client certificate present
    2. CN matches topic
    3. Certificate not revoked
    4. Required headers present

    ⚠️ API Gateway НЕ валидирует данные!
    Все данные принимаются "as-is" и сразу отправляются в Kafka .raw топик.
    Валидация происходит ПОТОМ в Quality Validator.
    """

    # ═══════════════════════════════════════════════════════════════════
    # 1. Проверка client certificate
    # ═══════════════════════════════════════════════════════════════════
    cn = get_client_cn(request)
    log.info(f"Request to /{topic} from CN={cn!r}")

    if not cn:
        log.warning("Missing client certificate")
        return {"detail": "Client certificate required"}, 401

    # ═══════════════════════════════════════════════════════════════════
    # 2. Проверка соответствия CN и topic
    # ═══════════════════════════════════════════════════════════════════
    if cn != topic:
        log.warning(f"Unauthorized: CN={cn!r} tried to access {topic!r}")
        return {
            "detail": f"Unauthorized. Certificate CN must match topic name.",
            "cn": cn,
            "requested_topic": topic,
        }, 403

    # ═══════════════════════════════════════════════════════════════════
    # 3. Проверка, что сертификат не отозван
    # ═══════════════════════════════════════════════════════════════════
    if not verify_cert_not_revoked(cn):
        log.warning(f"Revoked certificate: CN={cn!r}")
        return {"detail": "Certificate has been revoked"}, 403

    # ═══════════════════════════════════════════════════════════════════
    # 4. Проверка обязательных заголовков
    # ═══════════════════════════════════════════════════════════════════
    version = request.headers.get("version")
    batch = request.headers.get("batch")

    if not version:
        return {"detail": "Missing header: version"}, 400

    if not batch:
        return {"detail": "Missing header: batch"}, 400

    # Опциональные заголовки
    shard = request.headers.get("shard", "-1")
    producer_id = request.headers.get("producer-id", cn)
    trace_id = request.headers.get("trace-id", "")

    # ═══════════════════════════════════════════════════════════════════
    # 5. Проверка Content-Type
    # ═══════════════════════════════════════════════════════════════════
    content_type = request.headers.get("Content-Type", "")
    if not content_type.startswith("application/avro"):
        log.warning(f"Invalid Content-Type: {content_type}")
        return {"detail": "Content-Type must be application/avro"}, 400

    # ═══════════════════════════════════════════════════════════════════
    # 6. Получение body (Avro payload)
    # ═══════════════════════════════════════════════════════════════════
    body = request.data

    if not body:
        return {"detail": "Empty body"}, 400

    body_size = len(body)
    log.info(f"Received {body_size} bytes for {topic}")

    # ═══════════════════════════════════════════════════════════════════
    # 7. Отправка в Kafka RAW topic (as-is, без трансформации!)
    # ═══════════════════════════════════════════════════════════════════
    kafka_topic = f"{topic}.raw"

    try:
        kafka_producer.send(
            topic=kafka_topic,
            value=body,  # Отправляем как есть (Avro bytes)
            headers=[
                ("version", version.encode()),
                ("batch", batch.encode()),
                ("shard", shard.encode()),
                ("client-cn", cn.encode()),
                ("producer-id", producer_id.encode()),
                ("trace-id", trace_id.encode()),
                ("body-size-bytes", str(body_size).encode()),
                ("ingested-at", datetime.utcnow().isoformat().encode()),
            ],
        )

        log.info(f"Successfully sent to {kafka_topic}: {batch} messages")

    except Exception as e:
        log.exception(f"Kafka send error for {kafka_topic}")
        return {"detail": "Internal server error"}, 500

    # ═══════════════════════════════════════════════════════════════════
    # 8. Success response
    # ═══════════════════════════════════════════════════════════════════
    return Response(
        "Created",
        status=201,
        headers={
            "X-Kafka-Topic": kafka_topic,
            "X-Batch-Size": batch,
        }
    )


@app.get("/health")
def health():
    """Health check endpoint."""
    return {"status": "healthy"}


if __name__ == "__main__":
    app.run(host="0.0.0.0", port=8080)

Ротация сертификатов

Плановая ротация (до истечения срока)

За 30 дней до истечения:

  1. Генерировать новый сертификат с тем же CN
  2. Передать команде
  3. Команда обновляет конфигурацию (постепенный rollout)
  4. После полного rollout - отозвать старый сертификат

Экстренная ротация (компрометация)

  1. Немедленно отозвать скомпрометированный сертификат:

    echo "sales.orders" >> /etc/data-gateway/revoked.txt
    # Перезагрузить API Gateway для применения
    

  2. Генерировать новый сертификат

  3. Экстренная передача команде
  4. Команда обновляет ASAP

Мониторинг

Метрики для отслеживания

# Prometheus metrics
certificate_expiry_days = Gauge(
    "api_gateway_cert_expiry_days",
    "Days until certificate expires",
    ["cn", "team"]
)

certificate_requests_total = Counter(
    "api_gateway_cert_requests_total",
    "Total certificate authentication attempts",
    ["cn", "status"]  # status: success, unauthorized, revoked
)

Алерты

# Prometheus alerts
- alert: CertificateExpiringSoon
  expr: api_gateway_cert_expiry_days < 30
  labels:
    severity: warning
  annotations:
    summary: "Certificate {{ $labels.cn }} expires in {{ $value }} days"

- alert: CertificateExpired
  expr: api_gateway_cert_expiry_days < 0
  labels:
    severity: critical
  annotations:
    summary: "Certificate {{ $labels.cn }} has EXPIRED"

- alert: RevokedCertificateUsage
  expr: rate(api_gateway_cert_requests_total{status="revoked"}[5m]) > 0
  labels:
    severity: high
  annotations:
    summary: "Revoked certificate {{ $labels.cn }} is still being used"

Security Best Practices

  1. Приватные ключи:
  2. ✅ Хранить в secure vault (не в git!)
  3. ✅ Permissions 600 (только owner)
  4. ✅ Шифровать паролем при передаче
  5. ❌ Никогда не коммитить в git
  6. ❌ Никогда не отправлять по email

  7. CA (Certificate Authority):

  8. Приватный ключ CA в HSM или максимально защищенном хранилище
  9. Offline CA для production
  10. Регулярный аудит выданных сертификатов

  11. Ротация:

  12. Максимальный срок: 1 год
  13. Рекомендуемый: 6 месяцев
  14. Автоматические напоминания за 30/14/7 дней

  15. Аудит:

  16. Логировать все попытки аутентификации
  17. Алертить на неудачные попытки
  18. Quarterly review всех активных сертификатов

Версия: 1.0 Последнее обновление: 24 января 2026