Kafka Platform
This document describes how to setup basic AWS MSK and standalone Apache Kafka systems. The Kafka platform can be utilized in Decision Export or User Activity Export.
A Kafka system has three main parts: Producer, Broker, Consumer.
The Kafka message queue address is a Topic. Multiple Topics can exist.
The Producer sends messages to a broker or a topic.
Topics are composed of partitions. Partitions can be distributed across brokers. Partition messages are ordered.
Each Kafka message contains a Key and Value.
The Styra DAS producer, for each decision, sends the decision system_id as the Key and the decision (JSON) as the Value to the configured broker topic.
The Styra DAS producer supports the following authentication methods: open, plaintext, SASL (plain, scram 256/512), server-TLS, mutual-TLS.
Using /v1/workspace/KafkaConfig
API
The Kafka configuration contains many parameters and authentication mechanisms:
Example API configuration
Create Test User /v1/secrets/kafkaconfig
curl --request PUT \
--url ''$DAS_TENANT'/v1/secrets/kafkaconfig' \
--header 'authorization: Bearer '$DAS_WORKSPACE_TOKEN'' \
--header 'content-type: application/json' \
--data \
'{ \
"name":"test", \
"secret":"test" \
}'
Kafka OPEN Authentication
Perform a GET
on the /v1/workspace
and mix-in results with the decisions_exporter
segment.
curl --url ''$DAS_TENANT'/v1/workspace' --header 'authorization: Bearer '$DAS_WORKSPACE_TOKEN'' --header 'content-type: application/json'
curl --request PUT \
--url ''$DAS_TENANT'/v1/workspace' \
--header 'authorization: Bearer '$DAS_WORKSPACE_TOKEN'' \
--header 'content-type: application/json' \
--data \
'{ \
"decisions_exporter":{ \
"interval:"30s", \
"kafka":{ \
"version":"2.6.2", \
"authentication":"OPEN", \
"brokers":['$DAS_BROKERS'], \
"topic":'$DAS_TOPIC' \
"required_acks":"WaitForAll" \
} \
} \
}`
Kafka TLS-server with SASL:SCRAM-SHA-512 Authentication
Perform a GET
on the /v1/workspace
and mix-in results with the decisions_exporter
segment.
curl --url ''$DAS_TENANT'/v1/workspace' --header 'authorization: Bearer '$DAS_WORKSPACE_TOKEN'' --header 'content-type: application/json'
curl --request POST \
--url ''$DAS_TENANT'/v1/workspace' \
--header 'authorization: Bearer '$DAS_WORKSPACE_TOKEN'' \
--header 'content-type: application/json' \
--data \
'{ \
"decisions_exporter":{ \
"kafka":{ \
"topic":'$DAS_TOPIC', \
"brokers":['$DAS_BROKERS'], \
"authentication":"SASL", \
"required_acks":"WaitForAll", \
"sasl":{ \
"mechanism":"SCRAM-SHA-512", \
"plain":{ \
"user":"kafkaconfig" \
} \
}, \
"tls":{ \
"rootca":"-----BEGIN CERTIFICATE-----\nMIIDQTCCAimgAwIBAgITBmyfz5m/jAo54vB4ikPmljZbyjANBgkqhkiG9w0BAQsF\nADA5MQswCQYDVQQGEwJVUzEPMA0GA1UEChMGQW1hem9uMRkwFwYDVQQDExBBbWF6\nb24gUm9vdCBDQSAxMB4XDTE1MDUyNjAwMDAwMFoXDTM4MDExNzAwMDAwMFowOTEL\nMAkGA1UEBhMCVVMxDzANBgNVBAoTBkFtYXpvbjEZMBcGA1UEAxMQQW1hem9uIFJv\nb3QgQ0EgMTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBALJ4gHHKeNXj\nca9HgFB0fW7Y14h29Jlo91ghYPl0hAEvrAIthtOgQ3pOsqTQNroBvo3bSMgHFzZM\n9O6II8c+6zf1tRn4SWiw3te5djgdYZ6k/oI2peVKVuRF4fn9tBb6dNqcmzU5L/qw\nIFAGbHrQgLKm+a/sRxmPUDgH3KKHOVj4utWp+UhnMJbulHheb4mjUcAwhmahRWa6\nVOujw5H5SNz/0egwLX0tdHA114gk957EWW67c4cX8jJGKLhD+rcdqsq08p8kDi1L\n93FcXmn/6pUCyziKrlA4b9v7LWIbxcceVOF34GfID5yHI9Y/QCB/IIDEgEw+OyQm\njgSubJrIqg0CAwEAAaNCMEAwDwYDVR0TAQH/BAUwAwEB/zAOBgNVHQ8BAf8EBAMC\nAYYwHQYDVR0OBBYEFIQYzIU07LwMlJQuCFmcx7IQTgoIMA0GCSqGSIb3DQEBCwUA\nA4IBAQCY8jdaQZChGsV2USggNiMOruYou6r4lK5IpDB/G/wkjUu0yKGX9rbxenDI\nU5PMCCjjmCXPI6T53iHTfIUJrU6adTrCC2qJeHZERxhlbI1Bjjt/msv0tadQ1wUs\nN+gDS63pYaACbvXy8MWy7Vu33PqUXHeeE6V/Uq2V8viTO96LXFvKWlJbYK8U90vv\no/ufQJVtMVT8QtPHRh8jrdkPSHCa2XV4cdFyQzR1bldZwgJcJmApzyMZFo6IQ6XU\n5MsI+yMRQ+hDKXJioaldXgjUkK642M4UwtBV8ob2xJNDd2ZhwLnoQdeXeGADbkpy\nrqXRfboQnoZsG4q5WTP468SQvvG5\n-----END CERTIFICATE-----" \
} \
}, \
"interval":"30s" \
} \
}'
Verify Example /v1/workspace/kafka/verify-config
curl --request PUT \
--url ''$DAS_TENANT'/v1/workspace/kafka/verify-config' \
--header 'authorization: Bearer '$DAS_WORKSPACE_TOKEN'' \
--header 'content-type: application/json' \
--data \
'{ \
"authentication":"OPEN", \
"brokers":['$DAS_BROKERS'], \
"topic":'$DAS_TOPIC', \
"required_acks":"WaitForAll" \
}`
Secure Kafka Platform Access
AWS MSK portal
Perform the following steps to create a AWS MSK cluster, create and setup a topic, create and associate a SASL user, make the cluster public, and collect the broker list.
Styra DAS does not support SASL:AWS_MSK_IAM authentication. Instead, use TLS-server with SASL:SCRAM-SHA-512 authentication.
For more information on AWS MSK, see the AWS MSK overview page.
-
Create AWS MSK cluster
- Follow the AWS MSK create cluster example to setup a Kafka cluster in your default VPC. (takes up to 30 minutes)
- See MSK Create cluster
Consider adding these parameters to the Kafka cluster configuration:
allow.everyone.if.no.acl.found=false
auto.create.topics.enable=false
delete.topic.enable=true
log.retention.minutes=60 -
Setup Topic
- Create EC2 instance. EC2 instance
- Create a topic. MSK Topic
- Set the topic retention period MSK retention period
- Test producer and consumer MSK producer and consumer
-
Create and associate SASL user
- See MSK SASL Users
-
Make cluster public (optional)
- See MSK public access
- Set up user MSK ACLs
- Edit the cluster configuration and add
allow.everyone.if.no.acl.found=false
(takes up to 30 minutes) - Create and associate Elastic IPS with each broker network interface (ping each broker to get its local IP address)
- Enable the
PublicAccess
option via the awscli
-
Collect broker list
- Goto the AWS MSK portal and click the View client information
- Collect the relevant broker list (private or public if created)
-
EC2 sample setup
Substitute your topic for
styra-topic
.# set replication-factor to number of replicas in cluster and set partitions to distribute DAS systems load
bin/kafka-topics.sh --zookeeper <zookeeper> --create --partitions 1 --replication-factor 2 --topic styra-topic
bin/kafka-topics.sh --zookeeper <zookeeper> --list
# set a retention period
bin/kafka-configs.sh --zookeeper <zookeeper> --alter --entity-type topics --entity-name styra-topic --add-config retention.ms=60000
# set minimal ACLs for user 'user'
bin/kafka-acls.sh --authorizer-properties <zookeeper> --add --allow-principal User:user --operation All --topic 'styra-topic' --cluster
bin/kafka-acls.sh --authorizer-properties <zookeeper> --add --allow-principal User:user --topic 'styra-topic' --consumer --group '*'
# setup users_jaas.conf according to your security posture.
echo "KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="user"
password="user";
};" > /home/ec2-user/users_jaas.conf
export KAFKA_OPTS=-Djava.security.auth.login.config=/home/ec2-user/users_jaas.conf
# consume messages
./bin/kafka-console-consumer.sh --bootstrap-server <private endpoint> --consumer.config client.properties --topic styra-topic
Apache Kafka Standalone
Perform the steps in the Apache Kafka quickstart to create an Apache Kafka cluster and topic and use the default producer and consumer to test messages.