Skip to main content

Kafka Platform

This document describes how to setup basic AWS MSK and standalone Apache Kafka systems. The Kafka plaform can be utilized in Decision Export or User Activity Export.

A Kafka system has three main parts: Producer, Broker, Consumer.

The Kafka message queue address is a Topic. Multiple Topics can exist.

The Producer sends messages to a broker, topic.

Topics are composed of partitions. Partitions can be distributed across brokers, Partition messages are ordered.

Each Kafka message logically contains a Key and Value.

The DAS producer, for each decision sends the decision system_id as the Key, and decision (json) as the Value to the configured broker, topic.

The DAS producer support the following authentication methods: open, plaintext, sasl(plain, scram 256/512), server-tls, mutual-tls.

Using /v1/workspace/KafkaConfig API

Kafka configuration contains many parameters and authentication mechanisms:

Example API configuration

Create Test User /v1/secrets/kafkaconfig

curl --request PUT \
--url ''$DAS_TENANT'/v1/secrets/kafkaconfig' \
--header 'authorization: Bearer '$DAS_WORKSPACE_TOKEN'' \
--header 'content-type: application/json' \
--data \
'{ \
"name":"test", \
"secret":"test" \
}'

Kafka OPEN Authentication

Perform a GET on the /v1/workspace and mix-in results with the decisions_exporter segment.

curl --url ''$DAS_TENANT'/v1/workspace' --header 'authorization: Bearer '$DAS_WORKSPACE_TOKEN'' --header 'content-type: application/json'
curl --request PUT \
--url ''$DAS_TENANT'/v1/workspace' \
--header 'authorization: Bearer '$DAS_WORKSPACE_TOKEN'' \
--header 'content-type: application/json' \
--data \
'{ \
"decisions_exporter":{ \
"interval:"30s", \
"kafka":{ \
"version":"2.6.2", \
"authentication":"OPEN", \
"brokers":['$DAS_BROKERS'], \
"topic":'$DAS_TOPIC' \
"required_acks":"WaitForAll" \
} \
} \
}`

Kafka TLS-server with SASL:SCRAM-SHA-512 Authentication

Perform a GET on the /v1/workspace and mix-in results with the decisions_exporter segment.

curl --url ''$DAS_TENANT'/v1/workspace' --header 'authorization: Bearer '$DAS_WORKSPACE_TOKEN'' --header 'content-type: application/json'
curl --request POST \
--url ''$DAS_TENANT'/v1/workspace' \
--header 'authorization: Bearer '$DAS_WORKSPACE_TOKEN'' \
--header 'content-type: application/json' \
--data \
'{ \
"decisions_exporter":{ \
"kafka":{ \
"topic":'$DAS_TOPIC', \
"brokers":['$DAS_BROKERS'], \
"authentication":"SASL", \
"required_acks":"WaitForAll", \
"sasl":{ \
"mechanism":"SCRAM-SHA-512", \
"plain":{ \
"user":"kafkaconfig" \
} \
}, \
"tls":{ \
"rootca":"-----BEGIN CERTIFICATE-----\nMIIDQTCCAimgAwIBAgITBmyfz5m/jAo54vB4ikPmljZbyjANBgkqhkiG9w0BAQsF\nADA5MQswCQYDVQQGEwJVUzEPMA0GA1UEChMGQW1hem9uMRkwFwYDVQQDExBBbWF6\nb24gUm9vdCBDQSAxMB4XDTE1MDUyNjAwMDAwMFoXDTM4MDExNzAwMDAwMFowOTEL\nMAkGA1UEBhMCVVMxDzANBgNVBAoTBkFtYXpvbjEZMBcGA1UEAxMQQW1hem9uIFJv\nb3QgQ0EgMTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBALJ4gHHKeNXj\nca9HgFB0fW7Y14h29Jlo91ghYPl0hAEvrAIthtOgQ3pOsqTQNroBvo3bSMgHFzZM\n9O6II8c+6zf1tRn4SWiw3te5djgdYZ6k/oI2peVKVuRF4fn9tBb6dNqcmzU5L/qw\nIFAGbHrQgLKm+a/sRxmPUDgH3KKHOVj4utWp+UhnMJbulHheb4mjUcAwhmahRWa6\nVOujw5H5SNz/0egwLX0tdHA114gk957EWW67c4cX8jJGKLhD+rcdqsq08p8kDi1L\n93FcXmn/6pUCyziKrlA4b9v7LWIbxcceVOF34GfID5yHI9Y/QCB/IIDEgEw+OyQm\njgSubJrIqg0CAwEAAaNCMEAwDwYDVR0TAQH/BAUwAwEB/zAOBgNVHQ8BAf8EBAMC\nAYYwHQYDVR0OBBYEFIQYzIU07LwMlJQuCFmcx7IQTgoIMA0GCSqGSIb3DQEBCwUA\nA4IBAQCY8jdaQZChGsV2USggNiMOruYou6r4lK5IpDB/G/wkjUu0yKGX9rbxenDI\nU5PMCCjjmCXPI6T53iHTfIUJrU6adTrCC2qJeHZERxhlbI1Bjjt/msv0tadQ1wUs\nN+gDS63pYaACbvXy8MWy7Vu33PqUXHeeE6V/Uq2V8viTO96LXFvKWlJbYK8U90vv\no/ufQJVtMVT8QtPHRh8jrdkPSHCa2XV4cdFyQzR1bldZwgJcJmApzyMZFo6IQ6XU\n5MsI+yMRQ+hDKXJioaldXgjUkK642M4UwtBV8ob2xJNDd2ZhwLnoQdeXeGADbkpy\nrqXRfboQnoZsG4q5WTP468SQvvG5\n-----END CERTIFICATE-----" \
} \
}, \
"interval":"30s" \
} \
}'

Verify Example /v1/workspace/kafka/verify-config

curl --request PUT \
--url ''$DAS_TENANT'/v1/workspace/kafka/verify-config' \
--header 'authorization: Bearer '$DAS_WORKSPACE_TOKEN'' \
--header 'content-type: application/json' \
--data \
'{ \
"authentication":"OPEN", \
"brokers":['$DAS_BROKERS'], \
"topic":'$DAS_TOPIC', \
"required_acks":"WaitForAll" \
}`

Secure Kafka Platform Access

AWS MSK portal

Perform the following steps to create a AWS MSK cluster, create and setup topic, create and associate a SASL user, make public, and collect the broker list.

warning

The DAS does not support SASL:AWS_MSK_IAM authentication, use TLS-server with SASL:SCRAM-SHA-512 authentication.

For more information on AWS MSK, see the AWS MSK overview page.

  1. Create AWS MSK cluster

    • Follow the AWS MSK create cluster example to setup a kafka cluster in your default VPC. (takes upto 30 minutes)
    • See MSK Create cluster

    Consider adding these parameters to the kafka cluster configuration:
    allow.everyone.if.no.acl.found=false
    auto.create.topics.enable=false
    delete.topic.enable=true
    log.retention.minutes=60
  2. Setup Topic

  3. Create and associate SASL user

    See MSK SASL Users

  4. Make public (optional)

    • See MSK public access
    • Set up user ACLs MSK ACLs
    • Edit the cluster configuration and add allow.everyone.if.no.acl.found=false (takes upto 30 minutes)
    • Create and associate Elastic IPS with each broker network interface (ping each broker to get its local IP address)
    • Enable PublicAccess option via the awscli
  5. Collect broker list

    Goto the AWS MSK portal and click the View client information Collect the relevant broker list (private or public if created)

  6. EC2 sample setup

    Substitute your topic for 'styra-topic'.

    # set replication-factor to number of replicas in cluster and set partitions to distribute DAS systems load
    bin/kafka-topics.sh --zookeeper <zookeeper> --create --partitions 1 --replication-factor 2 --topic styra-topic
    bin/kafka-topics.sh --zookeeper <zookeeper> --list

    # set a retention period
    bin/kafka-configs.sh --zookeeper <zookeeper> --alter --entity-type topics --entity-name styra-topic --add-config retention.ms=60000

    # set minimal ACLs for user 'user'
    bin/kafka-acls.sh --authorizer-properties <zookeeper> --add --allow-principal User:user --operation All --topic 'styra-topic' --cluster
    bin/kafka-acls.sh --authorizer-properties <zookeeper> --add --allow-principal User:user --topic 'styra-topic' --consumer --group '*'

    # setup users_jaas.conf according to your security posture.
    echo "KafkaClient {
    org.apache.kafka.common.security.scram.ScramLoginModule required
    username="user"
    password="user";
    };" > /home/ec2-user/users_jaas.conf

    export KAFKA_OPTS=-Djava.security.auth.login.config=/home/ec2-user/users_jaas.conf

    # consume messages
    ./bin/kafka-console-consumer.sh --bootstrap-server <private endpoint> --consumer.config client.properties --topic styra-topic

Apache Kafka Standalone

Perform the following steps to create an Apache Kafka cluster, topic and use the default producer and consumer to test messages.

Apache Kafka quickstart