Skip to main content

Streaming data from Kafka in Styra Load

Styra Load has the capability to ingest data from Kafka topics and use it in policy evaluation. This can be useful to keep a dataset that is frequently updated fresh in Styra Load.

Overview

In this tutorial we'll be walking through how to use Styra Load's Kafka integration. To demo the Kafka functionality, we'll need to complete the following:

  • Run Kafka broker and Styra Load locally in some containers
  • Publish some data on the Kafka topic
  • Query the data which was streamed to Styra Load

Running the Kafka and Styra Load using Docker Compose

Styra Load requires a Kafka cluster to test the Kafka integration. For this tutorial, we'll run a local containerized test "cluster", using the Confluent images for Kafka and Zookeeper.

note

If you have a cluster running already, you may skip this step. In this tutorial, we are running a few containers and are using Docker Compose to orchestrate the procedure.

Create a file called docker-compose.yaml and insert the following docker-compose configuration below.

# docker-compose.yaml
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.3.0
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:7.3.0
container_name: broker
ports:
- "9092:9092"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://broker:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
load:
image: ghcr.io/styrainc/load:latest
ports:
- "8181:8181"
command:
- "run"
- "--server"
- "--log-level=debug"
- "--config-file=/data/load-conf.yaml"
- "/data/policy/transform.rego"
environment:
STYRA_LOAD_LICENSE_KEY: ${STYRA_LOAD_LICENSE_KEY}
volumes:
- "./:/data"
depends_on:
- broker
danger

The Kafka deployment above uses the settings from the Kafka quickstart, and is not suitable for production. This is appropriate for testing purposes only.

The load configuration mounts the current directory into /data in the container. We need this directory to contain a configuration file and a transformation policy file to format data as it is ingested into Styra Load.

Create a file called load-conf.yaml, this file is used to configure Styra Load to ingest data from Kafka. We must also instruct Styra Load on which policy to use to transform the data as it is ingested. Insert the content below into the load-conf.yaml file in the current directory.

# load-conf.yaml
plugins:
data:
kafka.messages:
type: kafka
urls:
- broker:29092
topics:
- users
rego_transform: "data.e2e.transform"

We also need to supply a policy to be loaded at data.e2e.transform. We do this by creating a local file called transform.rego in the current directory. The policy below will ensure that only messages on the users topic are ingested, and that the data is transformed to only include the name and roles fields. Insert the content below into the transform.rego file so that it can be loaded into the Styra Load container.

# transform.rego
package e2e

import future.keywords.contains
import future.keywords.if

transform contains {"op": "add", "path": payload.id, "value": val} if {
input.topic == "users"

payload := json.unmarshal(base64.decode(input.value))
val := object.filter(payload, ["name", "roles"])
}

Before running Styra Load, we need to set the STYRA_LOAD_LICENSE_KEY environment variable.

note

A trial license is required to evaluate Styra Load. Please register for a license at the Styra Load Free Trial page.

STYRA_LOAD_LICENSE_KEY=<license key here>

We can now bring up the demo with the following command:

docker-compose up

Publishing messages to Kafka

With the containers up and running, it's time to test the Styra Load integration with Kafka.

note

The following example uses the kcat tool to produce messages to Kafka topics in these examples, but any tool for producing messages should work.

We're going to publish some messages to the users topic, which we previously configured Styra Load to consume from. The data will be in the form of a JSON object per line, and will be stored in a file called users.jsonl.

You can create the users.jsonl file now with the content below.

{"id": "d9eccc5c", "name": "Alice", "roles": ["developer", "reports-reader"]}
{"id": "5c0ba07e", "name": "Bob", "roles": ["reports-admin"]}
{"id": "413adc7a", "name": "Eve", "roles": ["database-reader", "database-writer"]}

Now we can use this data to publish messages to the users topic.

kcat -P -b localhost -t users < resources/users.jsonl

To verify that the users now are consumable on the users topic, we can invoke kcat as a consumer.

$ kcat -C -b localhost -t users
{"id": "d9eccc5c", "name": "Alice", "roles": ["developer", "reports-reader"]}
{"id": "5c0ba07e", "name": "Bob", "roles": ["reports-admin"]}
{"id": "413adc7a", "name": "Eve", "roles": ["database-reader", "database-writer"]}
% Reached end of topic users [0] at offset 3

Query the data which was streamed to Styra Load

The same data we have seen using kcat can now be queried from Styra Load.

We can run a simple curl command to verify that the data has been ingested and transformed correctly.

$ curl -s "localhost:8181/v1/data/kafka/messages?pretty=true"
{
"result": {
"413adc7a": {
"name": "Eve",
"roles": [
"database-reader",
"database-writer"
]
},
"5c0ba07e": {
"name": "Bob",
"roles": [
"reports-admin"
]
},
"d9eccc5c": {
"name": "Alice",
"roles": [
"developer",
"reports-reader"
]
}
}
}

Further Reading