Управление mTLS сертификатами¶
Обзор¶
API Gateway использует mutual TLS (mTLS) для аутентификации издателей данных.
Принцип: Один сертификат = Один топик = Одна команда
Это обеспечивает: - ✅ Изоляцию между командами - ✅ Минимальные привилегии (least privilege) - ✅ Простой аудит (кто отправил данные) - ✅ Легкую ротацию при компрометации
Схема именования CN (Common Name)¶
Важно: CN точно соответствует имени топика в Kafka (без суффиксов .raw, _prod, _dlq).
Процесс выдачи сертификата¶
1. Запрос от команды¶
Команда создает тикет в JIRA с информацией:
Тип: Certificate Request
Контракт: sales/orders
Команда: sales-integration
Владелец: ivan.petrov@company.ru
Окружение: production
Срок действия: 1 год
2. Валидация запроса¶
Data Platform team проверяет:
- Контракт существует в GitLab
/contracts - Владелец указан в
contract.yaml - Email совпадает с owner.email в контракте
- Нет активного сертификата для этого топика
3. Генерация сертификата¶
#!/bin/bash
# generate_client_cert.sh
NAMESPACE="sales"
ENTITY="orders"
CN="${NAMESPACE}.${ENTITY}"
VALIDITY_DAYS=365
TEAM="sales-integration"
# Создаем директорию для сертификатов команды
CERT_DIR="./certs/${TEAM}"
mkdir -p "${CERT_DIR}"
# Генерируем приватный ключ
openssl genrsa -out "${CERT_DIR}/${CN}.key" 4096
# Создаем CSR (Certificate Signing Request)
openssl req -new \
-key "${CERT_DIR}/${CN}.key" \
-out "${CERT_DIR}/${CN}.csr" \
-subj "/C=RU/ST=Moscow/L=Moscow/O=Company/OU=${TEAM}/CN=${CN}"
# Подписываем сертификат CA
openssl x509 -req \
-in "${CERT_DIR}/${CN}.csr" \
-CA ./ca/ca.crt \
-CAkey ./ca/ca.key \
-CAcreateserial \
-out "${CERT_DIR}/${CN}.crt" \
-days ${VALIDITY_DAYS} \
-sha256
# Создаем bundle (cert + key)
cat "${CERT_DIR}/${CN}.crt" "${CERT_DIR}/${CN}.key" > "${CERT_DIR}/${CN}.pem"
echo "✅ Certificate generated:"
echo " CN: ${CN}"
echo " Files:"
echo " - ${CERT_DIR}/${CN}.crt (certificate)"
echo " - ${CERT_DIR}/${CN}.key (private key)"
echo " - ${CERT_DIR}/${CN}.pem (bundle)"
echo ""
echo "Valid until:"
openssl x509 -in "${CERT_DIR}/${CN}.crt" -noout -enddate
# Сохраняем метаданные
cat > "${CERT_DIR}/${CN}.metadata.json" <<EOF
{
"cn": "${CN}",
"team": "${TEAM}",
"created_at": "$(date -u +%Y-%m-%dT%H:%M:%SZ)",
"expires_at": "$(openssl x509 -in ${CERT_DIR}/${CN}.crt -noout -enddate | cut -d= -f2)",
"issued_by": "$(whoami)",
"contract_path": "domains/${NAMESPACE}/${ENTITY}/contract.yaml"
}
EOF
4. Регистрация в реестре¶
# Добавляем в реестр сертификатов
cat >> ./certs/registry.csv <<EOF
${CN},${TEAM},$(date +%Y-%m-%d),$(date -d "+${VALIDITY_DAYS} days" +%Y-%m-%d),active
EOF
Формат registry.csv:
cn,team,issued_date,expires_date,status
sales.orders,sales-integration,2026-01-23,2027-01-23,active
warehouse.inventory,warehouse-team,2026-01-20,2027-01-20,active
marketing.leads,marketing-tech,2026-01-15,2027-01-15,active
5. Передача сертификата команде¶
Безопасная передача:
-
Зашифровать приватный ключ паролем:
-
Передать пароль через 1Password/Vault
-
Отправить зашифрованный ключ + сертификат через защищенный канал
-
Никогда не отправлять по email!
6. Документирование¶
Обновить docs/certificates.md:
## Активные сертификаты
| CN | Команда | Expires | Contact |
|----|---------|---------|---------|
| sales.orders | sales-integration | 2027-01-23 | ivan@company.ru |
| warehouse.inventory | warehouse-team | 2027-01-20 | maria@company.ru |
Установка сертификата в приложении¶
Python пример¶
import requests
# Путь к сертификатам
CERT_FILE = "/secure/certs/sales.orders.crt"
KEY_FILE = "/secure/certs/sales.orders.key"
CA_FILE = "/secure/certs/ca.crt"
# Отправка данных
response = requests.post(
"https://data-gateway.company.ru/api/1.0/sales.orders",
data=avro_payload,
headers={
"Content-Type": "application/avro",
"version": "2.1.0",
"batch": "100",
},
cert=(CERT_FILE, KEY_FILE),
verify=CA_FILE,
)
Batch Producer (читает из БД)¶
import requests
# Batch producer читает данные из БД (PostgreSQL/MSSQL)
# и отправляет в API Gateway
#
# НЕ отправляйте напрямую из приложения (1C, Mindbox)!
response = requests.post(
"https://data-gateway.company.ru/api/1.0/sales.orders",
data=avro_payload, # Avro bytes из БД
headers={
"Content-Type": "application/avro",
"version": "2.1.0",
"batch": "100",
},
cert=(
"/secure/certs/sales.orders.crt",
"/secure/certs/sales.orders.key"
),
verify="/secure/certs/ca.crt",
timeout=30
)
assert response.status_code == 201 # Created
Полный пример: examples/1c/batch_producer.py
API Gateway: Проверка сертификата¶
from flask import Flask, request, Response
import logging
app = Flask(__name__)
log = logging.getLogger(__name__)
def get_client_cn(request) -> str:
"""Извлечь CN из client certificate."""
# Nginx передает client cert info через заголовки
# ssl_client_s_dn: CN=sales.orders,OU=sales-integration,O=Company
client_dn = request.headers.get("X-SSL-Client-S-DN", "")
if not client_dn:
# Альтернативно: прямой доступ через request.environ (если Flask видит SSL)
client_dn = request.environ.get("SSL_CLIENT_S_DN", "")
# Парсим CN из DN
for part in client_dn.split(","):
if part.strip().startswith("CN="):
return part.split("=", 1)[1].strip()
return None
def verify_cert_not_revoked(cn: str) -> bool:
"""Проверка, что сертификат не отозван."""
# Проверка в CRL (Certificate Revocation List) или OCSP
# Для простоты - проверка в локальном файле
try:
with open("/etc/data-gateway/revoked.txt", "r") as f:
revoked = [line.strip() for line in f]
return cn not in revoked
except FileNotFoundError:
return True
@app.post("/api/1.0/<topic>")
def ingest(topic: str):
"""
Принять данные от producer.
Проверки (только аутентификация и метаданные!):
1. Client certificate present
2. CN matches topic
3. Certificate not revoked
4. Required headers present
⚠️ API Gateway НЕ валидирует данные!
Все данные принимаются "as-is" и сразу отправляются в Kafka .raw топик.
Валидация происходит ПОТОМ в Quality Validator.
"""
# ═══════════════════════════════════════════════════════════════════
# 1. Проверка client certificate
# ═══════════════════════════════════════════════════════════════════
cn = get_client_cn(request)
log.info(f"Request to /{topic} from CN={cn!r}")
if not cn:
log.warning("Missing client certificate")
return {"detail": "Client certificate required"}, 401
# ═══════════════════════════════════════════════════════════════════
# 2. Проверка соответствия CN и topic
# ═══════════════════════════════════════════════════════════════════
if cn != topic:
log.warning(f"Unauthorized: CN={cn!r} tried to access {topic!r}")
return {
"detail": f"Unauthorized. Certificate CN must match topic name.",
"cn": cn,
"requested_topic": topic,
}, 403
# ═══════════════════════════════════════════════════════════════════
# 3. Проверка, что сертификат не отозван
# ═══════════════════════════════════════════════════════════════════
if not verify_cert_not_revoked(cn):
log.warning(f"Revoked certificate: CN={cn!r}")
return {"detail": "Certificate has been revoked"}, 403
# ═══════════════════════════════════════════════════════════════════
# 4. Проверка обязательных заголовков
# ═══════════════════════════════════════════════════════════════════
version = request.headers.get("version")
batch = request.headers.get("batch")
if not version:
return {"detail": "Missing header: version"}, 400
if not batch:
return {"detail": "Missing header: batch"}, 400
# Опциональные заголовки
shard = request.headers.get("shard", "-1")
producer_id = request.headers.get("producer-id", cn)
trace_id = request.headers.get("trace-id", "")
# ═══════════════════════════════════════════════════════════════════
# 5. Проверка Content-Type
# ═══════════════════════════════════════════════════════════════════
content_type = request.headers.get("Content-Type", "")
if not content_type.startswith("application/avro"):
log.warning(f"Invalid Content-Type: {content_type}")
return {"detail": "Content-Type must be application/avro"}, 400
# ═══════════════════════════════════════════════════════════════════
# 6. Получение body (Avro payload)
# ═══════════════════════════════════════════════════════════════════
body = request.data
if not body:
return {"detail": "Empty body"}, 400
body_size = len(body)
log.info(f"Received {body_size} bytes for {topic}")
# ═══════════════════════════════════════════════════════════════════
# 7. Отправка в Kafka RAW topic (as-is, без трансформации!)
# ═══════════════════════════════════════════════════════════════════
kafka_topic = f"{topic}.raw"
try:
kafka_producer.send(
topic=kafka_topic,
value=body, # Отправляем как есть (Avro bytes)
headers=[
("version", version.encode()),
("batch", batch.encode()),
("shard", shard.encode()),
("client-cn", cn.encode()),
("producer-id", producer_id.encode()),
("trace-id", trace_id.encode()),
("body-size-bytes", str(body_size).encode()),
("ingested-at", datetime.utcnow().isoformat().encode()),
],
)
log.info(f"Successfully sent to {kafka_topic}: {batch} messages")
except Exception as e:
log.exception(f"Kafka send error for {kafka_topic}")
return {"detail": "Internal server error"}, 500
# ═══════════════════════════════════════════════════════════════════
# 8. Success response
# ═══════════════════════════════════════════════════════════════════
return Response(
"Created",
status=201,
headers={
"X-Kafka-Topic": kafka_topic,
"X-Batch-Size": batch,
}
)
@app.get("/health")
def health():
"""Health check endpoint."""
return {"status": "healthy"}
if __name__ == "__main__":
app.run(host="0.0.0.0", port=8080)
Ротация сертификатов¶
Плановая ротация (до истечения срока)¶
За 30 дней до истечения:
- Генерировать новый сертификат с тем же CN
- Передать команде
- Команда обновляет конфигурацию (постепенный rollout)
- После полного rollout - отозвать старый сертификат
Экстренная ротация (компрометация)¶
-
Немедленно отозвать скомпрометированный сертификат:
-
Генерировать новый сертификат
- Экстренная передача команде
- Команда обновляет ASAP
Мониторинг¶
Метрики для отслеживания¶
# Prometheus metrics
certificate_expiry_days = Gauge(
"api_gateway_cert_expiry_days",
"Days until certificate expires",
["cn", "team"]
)
certificate_requests_total = Counter(
"api_gateway_cert_requests_total",
"Total certificate authentication attempts",
["cn", "status"] # status: success, unauthorized, revoked
)
Алерты¶
# Prometheus alerts
- alert: CertificateExpiringSoon
expr: api_gateway_cert_expiry_days < 30
labels:
severity: warning
annotations:
summary: "Certificate {{ $labels.cn }} expires in {{ $value }} days"
- alert: CertificateExpired
expr: api_gateway_cert_expiry_days < 0
labels:
severity: critical
annotations:
summary: "Certificate {{ $labels.cn }} has EXPIRED"
- alert: RevokedCertificateUsage
expr: rate(api_gateway_cert_requests_total{status="revoked"}[5m]) > 0
labels:
severity: high
annotations:
summary: "Revoked certificate {{ $labels.cn }} is still being used"
Security Best Practices¶
- Приватные ключи:
- ✅ Хранить в secure vault (не в git!)
- ✅ Permissions 600 (только owner)
- ✅ Шифровать паролем при передаче
- ❌ Никогда не коммитить в git
-
❌ Никогда не отправлять по email
-
CA (Certificate Authority):
- Приватный ключ CA в HSM или максимально защищенном хранилище
- Offline CA для production
-
Регулярный аудит выданных сертификатов
-
Ротация:
- Максимальный срок: 1 год
- Рекомендуемый: 6 месяцев
-
Автоматические напоминания за 30/14/7 дней
-
Аудит:
- Логировать все попытки аутентификации
- Алертить на неудачные попытки
- Quarterly review всех активных сертификатов
Версия: 1.0 Последнее обновление: 24 января 2026