Kafka API
Styra Load's support for the Kafka API makes it possible to stream data updates to Styra Load. This can be useful when events representing changes to data used in policy evaluation are available on a Kafka topic.
The code used in this walkthrough can be found here.
Example Configuration
The Kafka integration is provided via the data
plugin, and needs to be enabled in Load's configuration.
load-conf.yaml
plugins:
data:
kafka.messages:
type: kafka
URLs:
- broker:29092
topics:
- users
rego_transform: "data.e2e.transform"
The above minimal configuration should be pretty straight-forward, but there's a few things worth mentioning:
- The
kafka.messages
key will be used as a "namespace" by the plugin, and will have Load usedata.kafka.messages
for data ingested via Kafka. Use whatever name makes the most sense for your application. - The
topics
array will be the Kafka topics from which Loads consume the messages. - The
rego_transform
attribute allows us to use a message transformer on any incoming message.
Message transformers
The rego_transform
attribute specifies the path to a rule used to transform incoming messages into a format suitable for storage in Load. The raw input provided for each transform should be familiar to most Kafka users:
{
"headers": "eyJzb21lIjogImhlYWRlciB2YWx1ZSJ9",
"key": "",
"timestamp": 1675346581,
"topic": "resources"
"value": "eyJwYXlpbmciOiAiYXR0ZW50aW9uIn0"
}
Most of the attributes are optional (i.e. their values may be empty), and the base64-encoded value
is normally what we're most interested in. In our example, these provide a stream of JSON messages about users:
resources/users.jsonl
{"id": "d9eccc5c", "name": "Alice", "roles": ["developer", "reports-reader"]}
{"id": "5c0ba07e", "name": "Bob", "roles": ["reports-admin"]}
{"id": "413adc7a", "name": "Eve", "roles": ["database-reader", "database-writer"]}
...
A transform rule that stores each incoming message payload (or more concretly, user) indexed by its id
might look something like this:
policy/transform.rego
package e2e
import future.keywords.contains
import future.keywords.if
transform contains {"op": "add", "path": payload.id, "value": val} if {
input.topic == "users"
payload := json.unmarshal(base64.decode(input.value))
val := object.filter(payload, ["name", "roles"])
}
Transformer rules take incoming messages and uses JSON patch operations to modify the data store of Load. Provided the above transform was applied to the series of messages we saw before, querying OPA for data under data.kafka.messages
would now return:
{
"413adc7a": {
"name": "Eve",
"roles": ["developer", "reports-reader"]
},
"5c0ba07e": {
"name": "Bob",
"roles": ["reports-admin"]
},
"d9eccc5c": {
"name": "Eve",
"roles": ["database-reader", "database-writer"]
}
}
Kafka cluster
In order to test the Kafka integration with Styra Load, we'll need a Kafka cluster to connect to. For the purpose of this example, we'll run a local containerized test "cluster", using the Confluent images for Kafka and Zookeeper. If you have a cluster running already, you may skip this step. Since we'll be running a few containers, we'll use Docker Compose to orchestrate the procedure.
docker-compose.yaml
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.3.0
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:7.3.0
container_name: broker
ports:
- "9092:9092"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://broker:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
load:
image: ghcr.io/styrainc/load:latest
ports:
- "8181:8181"
command:
- "run"
- "--server"
- "--log-level=debug"
- "--config-file=/data/load-conf.yaml"
- "/data/policy/transform.rego"
environment:
STYRA_LOAD_LICENSE_KEY: ${STYRA_LOAD_LICENSE_KEY}
volumes:
- "./:/data"
depends_on:
- broker
The Kafka configuration above uses the settings from the Kafka quickstart, and is not suitable for production. For testing purposes however, it'll do just fine. The load
configuration mounts the current directory into /data
in the container. Inside of that directory, we're looking to use two files for Load: the load-conf.file
and transform.rego
, as seen previously. The environment
attribute sets the STYRA_LOAD_LICENSE_KEY
environment variable inside of the container, and may be set by the use of an .env
file placed in the current directory:
STYRA_LOAD_LICENSE_KEY=<license key here>
With that in place, we should be able to start our containers:
$ docker-compose up
Produce messages to a topic
With our containers up and running, we're ready to test the Load integration with Kafka. We'll be using the kcat tool to produce messages to Kafka topics in these examples, but any tool for producing messages should work. Given the list of users we saw before, we could store them in a JSON lines file (.jsonl
) and easily have kcat
produce a message per line in the file:
$ kcat -P -b localhost -t users < resources/users.jsonl
In order to verify that the users now are consumable on the resources
topic, invoke kcat
as a consumer:
$ kcat -C -b localhost -t users
{"id": "d9eccc5c", "name": "Alice", "roles": ["developer", "reports-reader"]}
{"id": "5c0ba07e", "name": "Bob", "roles": ["reports-admin"]}
{"id": "413adc7a", "name": "Eve", "roles": ["database-reader", "database-writer"]}
% Reached end of topic users [0] at offset 3
Query Load for Kafka data
Once we have validated that the messages are on our Kafka topic, we can also check that they've been ingested correctly by Load, and that our message transformer policy has worked as intended.
$ curl -s localhost:8181/v1/data/kafka/messages?pretty=true
{
"result": {
"413adc7a": {
"name": "Eve",
"roles": [
"database-reader",
"database-writer"
]
},
"5c0ba07e": {
"name": "Bob",
"roles": [
"reports-admin"
]
},
"d9eccc5c": {
"name": "Alice",
"roles": [
"developer",
"reports-reader"
]
}
}
}
The files used in the examples are also available in the Load blueprints repo.