Skip to main content

Kafka Platform

This document describes how to setup basic AWS MSK and standalone Apache Kafka systems. The Kafka platform can be utilized in Decision Export or User Activity Export.

A Kafka system has three main parts: Producer, Broker, Consumer.

The Kafka message queue address is a Topic. Multiple Topics can exist.

The Producer sends messages to a broker or a topic.

Topics are composed of partitions. Partitions can be distributed across brokers. Partition messages are ordered.

Each Kafka message contains a Key and Value.

The Styra DAS producer, for each decision, sends the decision system_id as the Key and the decision (JSON) as the Value to the configured broker topic.

The Styra DAS producer supports the following authentication methods: open, plaintext, SASL (plain, scram 256/512), server-TLS, mutual-TLS.

Using /v1/workspace/KafkaConfig API

The Kafka configuration contains many parameters and authentication mechanisms:

Example API configuration

Create Test User /v1/secrets/kafkaconfig

curl --request PUT \
--url ''$DAS_TENANT'/v1/secrets/kafkaconfig' \
--header 'authorization: Bearer '$DAS_WORKSPACE_TOKEN'' \
--header 'content-type: application/json' \
--data \
'{ \
"name":"test", \
"secret":"test" \
}'

Kafka OPEN Authentication

Perform a GET on the /v1/workspace and mix-in results with the decisions_exporter segment.

curl --url ''$DAS_TENANT'/v1/workspace' --header 'authorization: Bearer '$DAS_WORKSPACE_TOKEN'' --header 'content-type: application/json'
curl --request PUT \
--url ''$DAS_TENANT'/v1/workspace' \
--header 'authorization: Bearer '$DAS_WORKSPACE_TOKEN'' \
--header 'content-type: application/json' \
--data \
'{ \
"decisions_exporter":{ \
"interval:"30s", \
"kafka":{ \
"version":"2.6.2", \
"authentication":"OPEN", \
"brokers":['$DAS_BROKERS'], \
"topic":'$DAS_TOPIC' \
"required_acks":"WaitForAll" \
} \
} \
}`

Kafka TLS-server with SASL:SCRAM-SHA-512 Authentication

Perform a GET on the /v1/workspace and mix-in results with the decisions_exporter segment.

curl --url ''$DAS_TENANT'/v1/workspace' --header 'authorization: Bearer '$DAS_WORKSPACE_TOKEN'' --header 'content-type: application/json'
curl --request POST \
--url ''$DAS_TENANT'/v1/workspace' \
--header 'authorization: Bearer '$DAS_WORKSPACE_TOKEN'' \
--header 'content-type: application/json' \
--data \
'{ \
"decisions_exporter":{ \
"kafka":{ \
"topic":'$DAS_TOPIC', \
"brokers":['$DAS_BROKERS'], \
"authentication":"SASL", \
"required_acks":"WaitForAll", \
"sasl":{ \
"mechanism":"SCRAM-SHA-512", \
"plain":{ \
"user":"kafkaconfig" \
} \
}, \
"tls":{ \
"rootca":"-----BEGIN CERTIFICATE-----\nMIIDQTCCAimgAwIBAgITBmyfz5m/jAo54vB4ikPmljZbyjANBgkqhkiG9w0BAQsF\nADA5MQswCQYDVQQGEwJVUzEPMA0GA1UEChMGQW1hem9uMRkwFwYDVQQDExBBbWF6\nb24gUm9vdCBDQSAxMB4XDTE1MDUyNjAwMDAwMFoXDTM4MDExNzAwMDAwMFowOTEL\nMAkGA1UEBhMCVVMxDzANBgNVBAoTBkFtYXpvbjEZMBcGA1UEAxMQQW1hem9uIFJv\nb3QgQ0EgMTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBALJ4gHHKeNXj\nca9HgFB0fW7Y14h29Jlo91ghYPl0hAEvrAIthtOgQ3pOsqTQNroBvo3bSMgHFzZM\n9O6II8c+6zf1tRn4SWiw3te5djgdYZ6k/oI2peVKVuRF4fn9tBb6dNqcmzU5L/qw\nIFAGbHrQgLKm+a/sRxmPUDgH3KKHOVj4utWp+UhnMJbulHheb4mjUcAwhmahRWa6\nVOujw5H5SNz/0egwLX0tdHA114gk957EWW67c4cX8jJGKLhD+rcdqsq08p8kDi1L\n93FcXmn/6pUCyziKrlA4b9v7LWIbxcceVOF34GfID5yHI9Y/QCB/IIDEgEw+OyQm\njgSubJrIqg0CAwEAAaNCMEAwDwYDVR0TAQH/BAUwAwEB/zAOBgNVHQ8BAf8EBAMC\nAYYwHQYDVR0OBBYEFIQYzIU07LwMlJQuCFmcx7IQTgoIMA0GCSqGSIb3DQEBCwUA\nA4IBAQCY8jdaQZChGsV2USggNiMOruYou6r4lK5IpDB/G/wkjUu0yKGX9rbxenDI\nU5PMCCjjmCXPI6T53iHTfIUJrU6adTrCC2qJeHZERxhlbI1Bjjt/msv0tadQ1wUs\nN+gDS63pYaACbvXy8MWy7Vu33PqUXHeeE6V/Uq2V8viTO96LXFvKWlJbYK8U90vv\no/ufQJVtMVT8QtPHRh8jrdkPSHCa2XV4cdFyQzR1bldZwgJcJmApzyMZFo6IQ6XU\n5MsI+yMRQ+hDKXJioaldXgjUkK642M4UwtBV8ob2xJNDd2ZhwLnoQdeXeGADbkpy\nrqXRfboQnoZsG4q5WTP468SQvvG5\n-----END CERTIFICATE-----" \
} \
}, \
"interval":"30s" \
} \
}'

Verify Example /v1/workspace/kafka/verify-config

curl --request PUT \
--url ''$DAS_TENANT'/v1/workspace/kafka/verify-config' \
--header 'authorization: Bearer '$DAS_WORKSPACE_TOKEN'' \
--header 'content-type: application/json' \
--data \
'{ \
"authentication":"OPEN", \
"brokers":['$DAS_BROKERS'], \
"topic":'$DAS_TOPIC', \
"required_acks":"WaitForAll" \
}`

Secure Kafka Platform Access

AWS MSK portal

Perform the following steps to create a AWS MSK cluster, create and setup a topic, create and associate a SASL user, make the cluster public, and collect the broker list.

caution

Styra DAS does not support SASL:AWS_MSK_IAM authentication. Instead, use TLS-server with SASL:SCRAM-SHA-512 authentication.

For more information on AWS MSK, see the AWS MSK overview page.

  1. Create AWS MSK cluster

    • Follow the AWS MSK create cluster example to setup a Kafka cluster in your default VPC. (takes up to 30 minutes)
    • See MSK Create cluster

    Consider adding these parameters to the Kafka cluster configuration:

    allow.everyone.if.no.acl.found=false
    auto.create.topics.enable=false
    delete.topic.enable=true
    log.retention.minutes=60
  2. Setup Topic

  3. Create and associate SASL user

  4. Make cluster public (optional)

    • See MSK public access
    • Set up user MSK ACLs
    • Edit the cluster configuration and add allow.everyone.if.no.acl.found=false (takes up to 30 minutes)
    • Create and associate Elastic IPS with each broker network interface (ping each broker to get its local IP address)
    • Enable the PublicAccess option via the awscli
  5. Collect broker list

    • Goto the AWS MSK portal and click the View client information
    • Collect the relevant broker list (private or public if created)
  6. EC2 sample setup

    Substitute your topic for styra-topic.

    # set replication-factor to number of replicas in cluster and set partitions to distribute DAS systems load
    bin/kafka-topics.sh --zookeeper <zookeeper> --create --partitions 1 --replication-factor 2 --topic styra-topic
    bin/kafka-topics.sh --zookeeper <zookeeper> --list

    # set a retention period
    bin/kafka-configs.sh --zookeeper <zookeeper> --alter --entity-type topics --entity-name styra-topic --add-config retention.ms=60000

    # set minimal ACLs for user 'user'
    bin/kafka-acls.sh --authorizer-properties <zookeeper> --add --allow-principal User:user --operation All --topic 'styra-topic' --cluster
    bin/kafka-acls.sh --authorizer-properties <zookeeper> --add --allow-principal User:user --topic 'styra-topic' --consumer --group '*'

    # setup users_jaas.conf according to your security posture.
    echo "KafkaClient {
    org.apache.kafka.common.security.scram.ScramLoginModule required
    username="user"
    password="user";
    };" > /home/ec2-user/users_jaas.conf

    export KAFKA_OPTS=-Djava.security.auth.login.config=/home/ec2-user/users_jaas.conf

    # consume messages
    ./bin/kafka-console-consumer.sh --bootstrap-server <private endpoint> --consumer.config client.properties --topic styra-topic

Apache Kafka Standalone

Perform the steps in the Apache Kafka quickstart to create an Apache Kafka cluster and topic and use the default producer and consumer to test messages.