API Gateway: Примеры отправки данных¶
Обзор¶
API Gateway принимает данные от producers через mTLS и сохраняет их в Kafka.
Endpoint: https://data-gateway.company.ru/api/1.0/{topic}
Требования: - ✅ mTLS сертификат (CN = topic name) - ✅ Apache Avro сериализация - ✅ Обязательные заголовки: version, batch - ✅ Content-Type: application/avro
Процесс отправки¶
1. Приложение сохраняет данные в свою БД → COMMIT
2. Сериализация в Apache Avro
3. POST на API Gateway с mTLS
4. API Gateway → Kafka {topic}.raw
5. Quality Validator → Kafka {topic}.prod / {topic}.dlq
Python¶
Установка¶
Код¶
import io
import requests
from datetime import datetime
import avro.schema
import avro.io
# ═══════════════════════════════════════════════════════════════════════════
# Конфигурация
# ═══════════════════════════════════════════════════════════════════════════
API_GATEWAY_URL = 'https://data-gateway.company.ru/api/1.0/sales.orders'
CERT_FILE = '/secure/certs/sales.orders.crt'
KEY_FILE = '/secure/certs/sales.orders.key'
CA_FILE = '/secure/certs/ca.crt'
CONTRACT_VERSION = '2.1.0'
# Avro схема (из контракта)
AVRO_SCHEMA_JSON = """
{
"type": "record",
"name": "Order",
"namespace": "sales.orders",
"fields": [
{"name": "order_id", "type": "string"},
{"name": "customer_id", "type": "string"},
{"name": "total_amount", "type": "double"},
{"name": "status", "type": "string"},
{"name": "created_at", "type": {"type": "long", "logicalType": "timestamp-millis"}}
]
}
"""
schema = avro.schema.parse(AVRO_SCHEMA_JSON)
# ═══════════════════════════════════════════════════════════════════════════
# Функции
# ═══════════════════════════════════════════════════════════════════════════
def serialize_to_avro(records: list) -> bytes:
"""Сериализовать список записей в Avro."""
writer = avro.io.DatumWriter(schema)
bytes_writer = io.BytesIO()
encoder = avro.io.BinaryEncoder(bytes_writer)
for record in records:
# Преобразуем datetime в timestamp-millis
if isinstance(record.get('created_at'), datetime):
record['created_at'] = int(record['created_at'].timestamp() * 1000)
writer.write(record, encoder)
return bytes_writer.getvalue()
def send_to_api_gateway(records: list) -> bool:
"""Отправить записи в API Gateway."""
# Сериализация
avro_payload = serialize_to_avro(records)
# Заголовки
headers = {
'Content-Type': 'application/avro',
'version': CONTRACT_VERSION,
'batch': str(len(records)),
'producer-id': 'my-app',
'trace-id': f'trace-{datetime.utcnow().strftime("%Y%m%d%H%M%S")}',
}
try:
response = requests.post(
API_GATEWAY_URL,
data=avro_payload,
headers=headers,
cert=(CERT_FILE, KEY_FILE),
verify=CA_FILE,
timeout=30,
)
if response.status_code == 201:
print(f"✅ Successfully sent {len(records)} records")
return True
else:
print(f"❌ Error: {response.status_code} - {response.text}")
return False
except requests.exceptions.SSLError as e:
print(f"❌ mTLS error: {e}")
return False
except Exception as e:
print(f"❌ Error: {e}")
return False
# ═══════════════════════════════════════════════════════════════════════════
# Пример использования
# ═══════════════════════════════════════════════════════════════════════════
if __name__ == '__main__':
orders = [
{
'order_id': 'ord_123',
'customer_id': 'cust_456',
'total_amount': 99.99,
'status': 'confirmed',
'created_at': datetime.utcnow(),
},
{
'order_id': 'ord_124',
'customer_id': 'cust_457',
'total_amount': 149.50,
'status': 'pending',
'created_at': datetime.utcnow(),
},
]
success = send_to_api_gateway(orders)
print(f"Result: {success}")
Go¶
Установка¶
Код¶
package main
import (
"bytes"
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"net/http"
"time"
"github.com/linkedin/goavro/v2"
)
const (
apiGatewayURL = "https://data-gateway.company.ru/api/1.0/sales.orders"
certFile = "/secure/certs/sales.orders.crt"
keyFile = "/secure/certs/sales.orders.key"
caFile = "/secure/certs/ca.crt"
version = "2.1.0"
)
// Avro схема
const avroSchema = `
{
"type": "record",
"name": "Order",
"namespace": "sales.orders",
"fields": [
{"name": "order_id", "type": "string"},
{"name": "customer_id", "type": "string"},
{"name": "total_amount", "type": "double"},
{"name": "status", "type": "string"},
{"name": "created_at", "type": {"type": "long", "logicalType": "timestamp-millis"}}
]
}
`
func serializeToAvro(records []map[string]interface{}) ([]byte, error) {
codec, err := goavro.NewCodec(avroSchema)
if err != nil {
return nil, fmt.Errorf("failed to create codec: %w", err)
}
var buf bytes.Buffer
for _, record := range records {
// Преобразуем created_at в timestamp-millis
if createdAt, ok := record["created_at"].(time.Time); ok {
record["created_at"] = createdAt.UnixMilli()
}
binary, err := codec.BinaryFromNative(nil, record)
if err != nil {
return nil, fmt.Errorf("failed to serialize: %w", err)
}
buf.Write(binary)
}
return buf.Bytes(), nil
}
func sendToAPIGateway(records []map[string]interface{}) error {
// Сериализация в Avro
avroPayload, err := serializeToAvro(records)
if err != nil {
return fmt.Errorf("serialization error: %w", err)
}
// Загрузка сертификатов
cert, err := tls.LoadX509KeyPair(certFile, keyFile)
if err != nil {
return fmt.Errorf("failed to load client cert: %w", err)
}
// Загрузка CA
caCert, err := ioutil.ReadFile(caFile)
if err != nil {
return fmt.Errorf("failed to read CA cert: %w", err)
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
// Настройка TLS
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
}
// HTTP клиент
client := &http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsConfig,
},
Timeout: 30 * time.Second,
}
// Создание запроса
req, err := http.NewRequest("POST", apiGatewayURL, bytes.NewReader(avroPayload))
if err != nil {
return fmt.Errorf("failed to create request: %w", err)
}
// Заголовки
req.Header.Set("Content-Type", "application/avro")
req.Header.Set("version", version)
req.Header.Set("batch", fmt.Sprintf("%d", len(records)))
req.Header.Set("producer-id", "my-app-go")
req.Header.Set("trace-id", fmt.Sprintf("trace-%d", time.Now().Unix()))
// Отправка
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("request failed: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusCreated {
body, _ := ioutil.ReadAll(resp.Body)
return fmt.Errorf("unexpected status: %d - %s", resp.StatusCode, string(body))
}
fmt.Printf("✅ Successfully sent %d records\n", len(records))
return nil
}
func main() {
orders := []map[string]interface{}{
{
"order_id": "ord_123",
"customer_id": "cust_456",
"total_amount": 99.99,
"status": "confirmed",
"created_at": time.Now(),
},
{
"order_id": "ord_124",
"customer_id": "cust_457",
"total_amount": 149.50,
"status": "pending",
"created_at": time.Now(),
},
}
if err := sendToAPIGateway(orders); err != nil {
fmt.Printf("❌ Error: %v\n", err)
}
}
Batch Producer (PostgreSQL/MSSQL → API Gateway)¶
Типичный сценарий: чтение данных из БД владельца данных (PostgreSQL, MSSQL) и отправка в API Gateway.
┌──────────────┐ ┌──────────┐ ┌─────────────┐
│ PostgreSQL/ │────▶│ Batch │────▶│ API Gateway │
│ MSSQL │ │ or CDC │ │ (mTLS) │
└──────────────┘ └──────────┘ └─────────────┘
Реализация:
- CDC (real-time): examples/1c/README.md - Debezium connector
- Batch ETL: examples/1c/batch_producer.py - Python скрипт
Java¶
Maven Dependencies¶
<dependencies>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.3</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.14</version>
</dependency>
</dependencies>
Код¶
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.ssl.SSLContextBuilder;
import javax.net.ssl.SSLContext;
import java.io.ByteArrayOutputStream;
import java.io.FileInputStream;
import java.security.KeyStore;
import java.util.ArrayList;
import java.util.List;
public class DataPlatformClient {
private static final String API_GATEWAY_URL = "https://data-gateway.company.ru/api/1.0/sales.orders";
private static final String CERT_FILE = "/secure/certs/sales.orders.p12";
private static final String CERT_PASSWORD = "secret";
private static final String VERSION = "2.1.0";
private static final String AVRO_SCHEMA =
"{" +
" \"type\": \"record\"," +
" \"name\": \"Order\"," +
" \"namespace\": \"sales.orders\"," +
" \"fields\": [" +
" {\"name\": \"order_id\", \"type\": \"string\"}," +
" {\"name\": \"customer_id\", \"type\": \"string\"}," +
" {\"name\": \"total_amount\", \"type\": \"double\"}," +
" {\"name\": \"status\", \"type\": \"string\"}" +
" ]" +
"}";
public static byte[] serializeToAvro(List<GenericRecord> records) throws Exception {
Schema schema = new Schema.Parser().parse(AVRO_SCHEMA);
DatumWriter<GenericRecord> writer = new SpecificDatumWriter<>(schema);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null);
for (GenericRecord record : records) {
writer.write(record, encoder);
}
encoder.flush();
return outputStream.toByteArray();
}
public static boolean sendToAPIGateway(List<GenericRecord> records) throws Exception {
// Сериализация в Avro
byte[] avroPayload = serializeToAvro(records);
// Загрузка сертификата
KeyStore keyStore = KeyStore.getInstance("PKCS12");
keyStore.load(new FileInputStream(CERT_FILE), CERT_PASSWORD.toCharArray());
SSLContext sslContext = SSLContextBuilder
.create()
.loadKeyMaterial(keyStore, CERT_PASSWORD.toCharArray())
.build();
SSLConnectionSocketFactory sslSocketFactory = new SSLConnectionSocketFactory(sslContext);
CloseableHttpClient httpClient = HttpClients.custom()
.setSSLSocketFactory(sslSocketFactory)
.build();
// Создание запроса
HttpPost post = new HttpPost(API_GATEWAY_URL);
post.setEntity(new ByteArrayEntity(avroPayload));
post.setHeader("Content-Type", "application/avro");
post.setHeader("version", VERSION);
post.setHeader("batch", String.valueOf(records.size()));
post.setHeader("producer-id", "my-java-app");
// Отправка
CloseableHttpResponse response = httpClient.execute(post);
int statusCode = response.getStatusLine().getStatusCode();
response.close();
httpClient.close();
if (statusCode == 201) {
System.out.println("✅ Successfully sent " + records.size() + " records");
return true;
} else {
System.out.println("❌ Error: " + statusCode);
return false;
}
}
public static void main(String[] args) throws Exception {
Schema schema = new Schema.Parser().parse(AVRO_SCHEMA);
List<GenericRecord> orders = new ArrayList<>();
GenericRecord order1 = new GenericData.Record(schema);
order1.put("order_id", "ord_123");
order1.put("customer_id", "cust_456");
order1.put("total_amount", 99.99);
order1.put("status", "confirmed");
orders.add(order1);
sendToAPIGateway(orders);
}
}
Заголовки (Headers)¶
Обязательные¶
| Заголовок | Описание | Пример |
|---|---|---|
Content-Type | Тип контента | application/avro |
version | Версия контракта | 2.1.0 |
batch | Количество записей | 100 |
Опциональные (рекомендуются)¶
| Заголовок | Описание | Пример |
|---|---|---|
producer-id | ID producer приложения | my-app-v1 |
trace-id | ID для трейсинга | trace-20260123120000 |
shard | Shard ID (для партиционирования) | 0 |
Коды ответов¶
| Код | Описание | Действие |
|---|---|---|
201 | Успешно принято | ✅ Данные в Kafka |
400 | Неверный запрос | Проверьте заголовки и payload |
401 | Нет сертификата | Настройте mTLS |
403 | Неправильный CN | CN должен совпадать с топиком |
500 | Ошибка сервера | Повторите позже |
Troubleshooting¶
Ошибка: SSLError - certificate verify failed¶
Причина: Неправильный CA сертификат
Решение:
# Скачайте актуальный CA cert от Data Platform team
curl https://data-gateway.company.ru/ca.crt -o /secure/certs/ca.crt
Ошибка: 403 Unauthorized¶
Причина: CN в сертификате не совпадает с топиком
Решение:
# Проверьте CN в вашем сертификате
openssl x509 -in /secure/certs/sales.orders.crt -noout -subject
# Должно быть: subject=CN=sales.orders
Ошибка: 400 Missing headers¶
Причина: Не указаны обязательные заголовки
Решение: Убедитесь что указаны version и batch