Перейти к содержанию

API Gateway: Примеры отправки данных

Обзор

API Gateway принимает данные от producers через mTLS и сохраняет их в Kafka.

Endpoint: https://data-gateway.company.ru/api/1.0/{topic}

Требования: - ✅ mTLS сертификат (CN = topic name) - ✅ Apache Avro сериализация - ✅ Обязательные заголовки: version, batch - ✅ Content-Type: application/avro

Процесс отправки

1. Приложение сохраняет данные в свою БД → COMMIT
2. Сериализация в Apache Avro
3. POST на API Gateway с mTLS
4. API Gateway → Kafka {topic}.raw
5. Quality Validator → Kafka {topic}.prod / {topic}.dlq

Python

Установка

pip install avro-python3 requests

Код

import io
import requests
from datetime import datetime
import avro.schema
import avro.io

# ═══════════════════════════════════════════════════════════════════════════
# Конфигурация
# ═══════════════════════════════════════════════════════════════════════════

API_GATEWAY_URL = 'https://data-gateway.company.ru/api/1.0/sales.orders'
CERT_FILE = '/secure/certs/sales.orders.crt'
KEY_FILE = '/secure/certs/sales.orders.key'
CA_FILE = '/secure/certs/ca.crt'
CONTRACT_VERSION = '2.1.0'

# Avro схема (из контракта)
AVRO_SCHEMA_JSON = """
{
  "type": "record",
  "name": "Order",
  "namespace": "sales.orders",
  "fields": [
    {"name": "order_id", "type": "string"},
    {"name": "customer_id", "type": "string"},
    {"name": "total_amount", "type": "double"},
    {"name": "status", "type": "string"},
    {"name": "created_at", "type": {"type": "long", "logicalType": "timestamp-millis"}}
  ]
}
"""

schema = avro.schema.parse(AVRO_SCHEMA_JSON)


# ═══════════════════════════════════════════════════════════════════════════
# Функции
# ═══════════════════════════════════════════════════════════════════════════

def serialize_to_avro(records: list) -> bytes:
    """Сериализовать список записей в Avro."""
    writer = avro.io.DatumWriter(schema)
    bytes_writer = io.BytesIO()
    encoder = avro.io.BinaryEncoder(bytes_writer)

    for record in records:
        # Преобразуем datetime в timestamp-millis
        if isinstance(record.get('created_at'), datetime):
            record['created_at'] = int(record['created_at'].timestamp() * 1000)

        writer.write(record, encoder)

    return bytes_writer.getvalue()


def send_to_api_gateway(records: list) -> bool:
    """Отправить записи в API Gateway."""

    # Сериализация
    avro_payload = serialize_to_avro(records)

    # Заголовки
    headers = {
        'Content-Type': 'application/avro',
        'version': CONTRACT_VERSION,
        'batch': str(len(records)),
        'producer-id': 'my-app',
        'trace-id': f'trace-{datetime.utcnow().strftime("%Y%m%d%H%M%S")}',
    }

    try:
        response = requests.post(
            API_GATEWAY_URL,
            data=avro_payload,
            headers=headers,
            cert=(CERT_FILE, KEY_FILE),
            verify=CA_FILE,
            timeout=30,
        )

        if response.status_code == 201:
            print(f"✅ Successfully sent {len(records)} records")
            return True
        else:
            print(f"❌ Error: {response.status_code} - {response.text}")
            return False

    except requests.exceptions.SSLError as e:
        print(f"❌ mTLS error: {e}")
        return False
    except Exception as e:
        print(f"❌ Error: {e}")
        return False


# ═══════════════════════════════════════════════════════════════════════════
# Пример использования
# ═══════════════════════════════════════════════════════════════════════════

if __name__ == '__main__':
    orders = [
        {
            'order_id': 'ord_123',
            'customer_id': 'cust_456',
            'total_amount': 99.99,
            'status': 'confirmed',
            'created_at': datetime.utcnow(),
        },
        {
            'order_id': 'ord_124',
            'customer_id': 'cust_457',
            'total_amount': 149.50,
            'status': 'pending',
            'created_at': datetime.utcnow(),
        },
    ]

    success = send_to_api_gateway(orders)
    print(f"Result: {success}")

Go

Установка

go get github.com/linkedin/goavro/v2

Код

package main

import (
    "bytes"
    "crypto/tls"
    "crypto/x509"
    "fmt"
    "io/ioutil"
    "net/http"
    "time"

    "github.com/linkedin/goavro/v2"
)

const (
    apiGatewayURL = "https://data-gateway.company.ru/api/1.0/sales.orders"
    certFile      = "/secure/certs/sales.orders.crt"
    keyFile       = "/secure/certs/sales.orders.key"
    caFile        = "/secure/certs/ca.crt"
    version       = "2.1.0"
)

// Avro схема
const avroSchema = `
{
  "type": "record",
  "name": "Order",
  "namespace": "sales.orders",
  "fields": [
    {"name": "order_id", "type": "string"},
    {"name": "customer_id", "type": "string"},
    {"name": "total_amount", "type": "double"},
    {"name": "status", "type": "string"},
    {"name": "created_at", "type": {"type": "long", "logicalType": "timestamp-millis"}}
  ]
}
`

func serializeToAvro(records []map[string]interface{}) ([]byte, error) {
    codec, err := goavro.NewCodec(avroSchema)
    if err != nil {
        return nil, fmt.Errorf("failed to create codec: %w", err)
    }

    var buf bytes.Buffer

    for _, record := range records {
        // Преобразуем created_at в timestamp-millis
        if createdAt, ok := record["created_at"].(time.Time); ok {
            record["created_at"] = createdAt.UnixMilli()
        }

        binary, err := codec.BinaryFromNative(nil, record)
        if err != nil {
            return nil, fmt.Errorf("failed to serialize: %w", err)
        }

        buf.Write(binary)
    }

    return buf.Bytes(), nil
}

func sendToAPIGateway(records []map[string]interface{}) error {
    // Сериализация в Avro
    avroPayload, err := serializeToAvro(records)
    if err != nil {
        return fmt.Errorf("serialization error: %w", err)
    }

    // Загрузка сертификатов
    cert, err := tls.LoadX509KeyPair(certFile, keyFile)
    if err != nil {
        return fmt.Errorf("failed to load client cert: %w", err)
    }

    // Загрузка CA
    caCert, err := ioutil.ReadFile(caFile)
    if err != nil {
        return fmt.Errorf("failed to read CA cert: %w", err)
    }

    caCertPool := x509.NewCertPool()
    caCertPool.AppendCertsFromPEM(caCert)

    // Настройка TLS
    tlsConfig := &tls.Config{
        Certificates: []tls.Certificate{cert},
        RootCAs:      caCertPool,
    }

    // HTTP клиент
    client := &http.Client{
        Transport: &http.Transport{
            TLSClientConfig: tlsConfig,
        },
        Timeout: 30 * time.Second,
    }

    // Создание запроса
    req, err := http.NewRequest("POST", apiGatewayURL, bytes.NewReader(avroPayload))
    if err != nil {
        return fmt.Errorf("failed to create request: %w", err)
    }

    // Заголовки
    req.Header.Set("Content-Type", "application/avro")
    req.Header.Set("version", version)
    req.Header.Set("batch", fmt.Sprintf("%d", len(records)))
    req.Header.Set("producer-id", "my-app-go")
    req.Header.Set("trace-id", fmt.Sprintf("trace-%d", time.Now().Unix()))

    // Отправка
    resp, err := client.Do(req)
    if err != nil {
        return fmt.Errorf("request failed: %w", err)
    }
    defer resp.Body.Close()

    if resp.StatusCode != http.StatusCreated {
        body, _ := ioutil.ReadAll(resp.Body)
        return fmt.Errorf("unexpected status: %d - %s", resp.StatusCode, string(body))
    }

    fmt.Printf("✅ Successfully sent %d records\n", len(records))
    return nil
}

func main() {
    orders := []map[string]interface{}{
        {
            "order_id":     "ord_123",
            "customer_id":  "cust_456",
            "total_amount": 99.99,
            "status":       "confirmed",
            "created_at":   time.Now(),
        },
        {
            "order_id":     "ord_124",
            "customer_id":  "cust_457",
            "total_amount": 149.50,
            "status":       "pending",
            "created_at":   time.Now(),
        },
    }

    if err := sendToAPIGateway(orders); err != nil {
        fmt.Printf("❌ Error: %v\n", err)
    }
}

Batch Producer (PostgreSQL/MSSQL → API Gateway)

Типичный сценарий: чтение данных из БД владельца данных (PostgreSQL, MSSQL) и отправка в API Gateway.

┌──────────────┐     ┌──────────┐     ┌─────────────┐
│ PostgreSQL/  │────▶│  Batch   │────▶│ API Gateway │
│   MSSQL      │     │  or CDC  │     │   (mTLS)    │
└──────────────┘     └──────────┘     └─────────────┘

Реализация:


Java

Maven Dependencies

<dependencies>
    <dependency>
        <groupId>org.apache.avro</groupId>
        <artifactId>avro</artifactId>
        <version>1.11.3</version>
    </dependency>
    <dependency>
        <groupId>org.apache.httpcomponents</groupId>
        <artifactId>httpclient</artifactId>
        <version>4.5.14</version>
    </dependency>
</dependencies>

Код

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.ssl.SSLContextBuilder;

import javax.net.ssl.SSLContext;
import java.io.ByteArrayOutputStream;
import java.io.FileInputStream;
import java.security.KeyStore;
import java.util.ArrayList;
import java.util.List;

public class DataPlatformClient {

    private static final String API_GATEWAY_URL = "https://data-gateway.company.ru/api/1.0/sales.orders";
    private static final String CERT_FILE = "/secure/certs/sales.orders.p12";
    private static final String CERT_PASSWORD = "secret";
    private static final String VERSION = "2.1.0";

    private static final String AVRO_SCHEMA = 
        "{" +
        "  \"type\": \"record\"," +
        "  \"name\": \"Order\"," +
        "  \"namespace\": \"sales.orders\"," +
        "  \"fields\": [" +
        "    {\"name\": \"order_id\", \"type\": \"string\"}," +
        "    {\"name\": \"customer_id\", \"type\": \"string\"}," +
        "    {\"name\": \"total_amount\", \"type\": \"double\"}," +
        "    {\"name\": \"status\", \"type\": \"string\"}" +
        "  ]" +
        "}";

    public static byte[] serializeToAvro(List<GenericRecord> records) throws Exception {
        Schema schema = new Schema.Parser().parse(AVRO_SCHEMA);
        DatumWriter<GenericRecord> writer = new SpecificDatumWriter<>(schema);

        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
        BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null);

        for (GenericRecord record : records) {
            writer.write(record, encoder);
        }

        encoder.flush();
        return outputStream.toByteArray();
    }

    public static boolean sendToAPIGateway(List<GenericRecord> records) throws Exception {
        // Сериализация в Avro
        byte[] avroPayload = serializeToAvro(records);

        // Загрузка сертификата
        KeyStore keyStore = KeyStore.getInstance("PKCS12");
        keyStore.load(new FileInputStream(CERT_FILE), CERT_PASSWORD.toCharArray());

        SSLContext sslContext = SSLContextBuilder
            .create()
            .loadKeyMaterial(keyStore, CERT_PASSWORD.toCharArray())
            .build();

        SSLConnectionSocketFactory sslSocketFactory = new SSLConnectionSocketFactory(sslContext);

        CloseableHttpClient httpClient = HttpClients.custom()
            .setSSLSocketFactory(sslSocketFactory)
            .build();

        // Создание запроса
        HttpPost post = new HttpPost(API_GATEWAY_URL);
        post.setEntity(new ByteArrayEntity(avroPayload));
        post.setHeader("Content-Type", "application/avro");
        post.setHeader("version", VERSION);
        post.setHeader("batch", String.valueOf(records.size()));
        post.setHeader("producer-id", "my-java-app");

        // Отправка
        CloseableHttpResponse response = httpClient.execute(post);
        int statusCode = response.getStatusLine().getStatusCode();

        response.close();
        httpClient.close();

        if (statusCode == 201) {
            System.out.println("✅ Successfully sent " + records.size() + " records");
            return true;
        } else {
            System.out.println("❌ Error: " + statusCode);
            return false;
        }
    }

    public static void main(String[] args) throws Exception {
        Schema schema = new Schema.Parser().parse(AVRO_SCHEMA);

        List<GenericRecord> orders = new ArrayList<>();

        GenericRecord order1 = new GenericData.Record(schema);
        order1.put("order_id", "ord_123");
        order1.put("customer_id", "cust_456");
        order1.put("total_amount", 99.99);
        order1.put("status", "confirmed");
        orders.add(order1);

        sendToAPIGateway(orders);
    }
}

Заголовки (Headers)

Обязательные

Заголовок Описание Пример
Content-Type Тип контента application/avro
version Версия контракта 2.1.0
batch Количество записей 100

Опциональные (рекомендуются)

Заголовок Описание Пример
producer-id ID producer приложения my-app-v1
trace-id ID для трейсинга trace-20260123120000
shard Shard ID (для партиционирования) 0

Коды ответов

Код Описание Действие
201 Успешно принято ✅ Данные в Kafka
400 Неверный запрос Проверьте заголовки и payload
401 Нет сертификата Настройте mTLS
403 Неправильный CN CN должен совпадать с топиком
500 Ошибка сервера Повторите позже

Troubleshooting

Ошибка: SSLError - certificate verify failed

Причина: Неправильный CA сертификат

Решение:

# Скачайте актуальный CA cert от Data Platform team
curl https://data-gateway.company.ru/ca.crt -o /secure/certs/ca.crt

Ошибка: 403 Unauthorized

Причина: CN в сертификате не совпадает с топиком

Решение:

# Проверьте CN в вашем сертификате
openssl x509 -in /secure/certs/sales.orders.crt -noout -subject

# Должно быть: subject=CN=sales.orders

Ошибка: 400 Missing headers

Причина: Не указаны обязательные заголовки

Решение: Убедитесь что указаны version и batch


См. также