Skip to main content

Kafka API

Styra Load's support for the Kafka API makes it possible to stream data updates to Styra Load. This can be useful when events representing changes to data used in policy evaluation are available on a Kafka topic.

note

The code used in this walkthrough can be found here.

Example Configuration

The Kafka integration is provided via the data plugin, and needs to be enabled in Load's configuration.

load-conf.yaml

plugins:
data:
kafka.messages:
type: kafka
URLs:
- broker:29092
topics:
- users
rego_transform: "data.e2e.transform"

The above minimal configuration should be pretty straight-forward, but there's a few things worth mentioning:

  • The kafka.messages key will be used as a "namespace" by the plugin, and will have Load use data.kafka.messages for data ingested via Kafka. Use whatever name makes the most sense for your application.
  • The topics array will be the Kafka topics from which Loads consume the messages.
  • The rego_transform attribute allows us to use a message transformer on any incoming message.

Message transformers

The rego_transform attribute specifies the path to a rule used to transform incoming messages into a format suitable for storage in Load. The raw input provided for each transform should be familiar to most Kafka users:

{
"headers": "eyJzb21lIjogImhlYWRlciB2YWx1ZSJ9",
"key": "",
"timestamp": 1675346581,
"topic": "resources"
"value": "eyJwYXlpbmciOiAiYXR0ZW50aW9uIn0"
}

Most of the attributes are optional (i.e. their values may be empty), and the base64-encoded value is normally what we're most interested in. In our example, these provide a stream of JSON messages about users:

resources/users.jsonl

{"id": "d9eccc5c", "name": "Alice", "roles": ["developer", "reports-reader"]}
{"id": "5c0ba07e", "name": "Bob", "roles": ["reports-admin"]}
{"id": "413adc7a", "name": "Eve", "roles": ["database-reader", "database-writer"]}
...

A transform rule that stores each incoming message payload (or more concretly, user) indexed by its id might look something like this:

policy/transform.rego

package e2e

import future.keywords.contains
import future.keywords.if

transform contains {"op": "add", "path": payload.id, "value": val} if {
input.topic == "users"

payload := json.unmarshal(base64.decode(input.value))
val := object.filter(payload, ["name", "roles"])
}

Transformer rules take incoming messages and uses JSON patch operations to modify the data store of Load. Provided the above transform was applied to the series of messages we saw before, querying OPA for data under data.kafka.messages would now return:

{
"413adc7a": {
"name": "Eve",
"roles": ["developer", "reports-reader"]
},
"5c0ba07e": {
"name": "Bob",
"roles": ["reports-admin"]
},
"d9eccc5c": {
"name": "Eve",
"roles": ["database-reader", "database-writer"]
}
}

Kafka cluster

In order to test the Kafka integration with Styra Load, we'll need a Kafka cluster to connect to. For the purpose of this example, we'll run a local containerized test "cluster", using the Confluent images for Kafka and Zookeeper. If you have a cluster running already, you may skip this step. Since we'll be running a few containers, we'll use Docker Compose to orchestrate the procedure.

docker-compose.yaml

version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.3.0
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:7.3.0
container_name: broker
ports:
- "9092:9092"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://broker:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
load:
image: ghcr.io/styrainc/load:latest
ports:
- "8181:8181"
command:
- "run"
- "--server"
- "--log-level=debug"
- "--config-file=/data/load-conf.yaml"
- "/data/policy/transform.rego"
environment:
STYRA_LOAD_LICENSE_KEY: ${STYRA_LOAD_LICENSE_KEY}
volumes:
- "./:/data"
depends_on:
- broker

The Kafka configuration above uses the settings from the Kafka quickstart, and is not suitable for production. For testing purposes however, it'll do just fine. The load configuration mounts the current directory into /data in the container. Inside of that directory, we're looking to use two files for Load: the load-conf.file and transform.rego, as seen previously. The environment attribute sets the STYRA_LOAD_LICENSE_KEY environment variable inside of the container, and may be set by the use of an .env file placed in the current directory:

STYRA_LOAD_LICENSE_KEY=<license key here>

With that in place, we should be able to start our containers:

$ docker-compose up

Produce messages to a topic

With our containers up and running, we're ready to test the Load integration with Kafka. We'll be using the kcat tool to produce messages to Kafka topics in these examples, but any tool for producing messages should work. Given the list of users we saw before, we could store them in a JSON lines file (.jsonl) and easily have kcat produce a message per line in the file:

$ kcat -P -b localhost -t users < resources/users.jsonl

In order to verify that the users now are consumable on the resources topic, invoke kcat as a consumer:

$ kcat -C -b localhost -t users
{"id": "d9eccc5c", "name": "Alice", "roles": ["developer", "reports-reader"]}
{"id": "5c0ba07e", "name": "Bob", "roles": ["reports-admin"]}
{"id": "413adc7a", "name": "Eve", "roles": ["database-reader", "database-writer"]}
% Reached end of topic users [0] at offset 3

Query Load for Kafka data

Once we have validated that the messages are on our Kafka topic, we can also check that they've been ingested correctly by Load, and that our message transformer policy has worked as intended.

$ curl -s localhost:8181/v1/data/kafka/messages?pretty=true
{
"result": {
"413adc7a": {
"name": "Eve",
"roles": [
"database-reader",
"database-writer"
]
},
"5c0ba07e": {
"name": "Bob",
"roles": [
"reports-admin"
]
},
"d9eccc5c": {
"name": "Alice",
"roles": [
"developer",
"reports-reader"
]
}
}
}

The files used in the examples are also available in the Load blueprints repo.