Streaming data from Kafka in Styra Load
Styra Load has the capability to ingest data from Kafka topics and use it in policy evaluation. This can be useful to keep a dataset that is frequently updated fresh in Styra Load.
Overview
In this tutorial we'll be walking through how to use Styra Load's Kafka integration. To demo the Kafka functionality, we'll need to complete the following:
- Run Kafka broker and Styra Load locally in some containers
- Publish some data on the Kafka topic
- Query the data which was streamed to Styra Load
Running the Kafka and Styra Load using Docker Compose
Styra Load requires a Kafka cluster to test the Kafka integration. For this tutorial, we'll run a local containerized test "cluster", using the Confluent images for Kafka and Zookeeper.
If you have a cluster running already, you may skip this step. In this tutorial, we are running a few containers and are using Docker Compose to orchestrate the procedure.
Create a file called docker-compose.yaml
and insert the following docker-compose configuration below.
# docker-compose.yaml
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.3.0
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:7.3.0
container_name: broker
ports:
- "9092:9092"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://broker:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
load:
image: ghcr.io/styrainc/load:latest
ports:
- "8181:8181"
command:
- "run"
- "--server"
- "--log-level=debug"
- "--config-file=/data/load-conf.yaml"
- "/data/policy/transform.rego"
environment:
STYRA_LOAD_LICENSE_KEY: ${STYRA_LOAD_LICENSE_KEY}
volumes:
- "./:/data"
depends_on:
- broker
The Kafka deployment above uses the settings from the Kafka quickstart, and is not suitable for production. This is appropriate for testing purposes only.
The load
configuration mounts the current directory into /data
in the container. We need this directory to contain a configuration file and a transformation policy file to format data as it is ingested into Styra Load.
Create a file called load-conf.yaml
, this file is used to configure Styra Load to ingest data from Kafka.
We must also instruct Styra Load on which policy to use to transform the data as it is ingested.
Insert the content below into the load-conf.yaml
file in the current directory.
# load-conf.yaml
plugins:
data:
kafka.messages:
type: kafka
urls:
- broker:29092
topics:
- users
rego_transform: "data.e2e.transform"
We also need to supply a policy to be loaded at data.e2e.transform
.
We do this by creating a local file called transform.rego
in the current directory.
The policy below will ensure that only messages on the users
topic are ingested, and that the data is transformed to only include the name
and roles
fields.
Insert the content below into the transform.rego
file so that it can be loaded into the Styra Load container.
# transform.rego
package e2e
import future.keywords.contains
import future.keywords.if
transform contains {"op": "add", "path": payload.id, "value": val} if {
input.topic == "users"
payload := json.unmarshal(base64.decode(input.value))
val := object.filter(payload, ["name", "roles"])
}
Before running Styra Load, we need to set the STYRA_LOAD_LICENSE_KEY
environment variable.
A trial license is required to evaluate Styra Load. Please register for a license at the Styra Load Free Trial page.
STYRA_LOAD_LICENSE_KEY=<license key here>
We can now bring up the demo with the following command:
docker-compose up
Publishing messages to Kafka
With the containers up and running, it's time to test the Styra Load integration with Kafka.
The following example uses the kcat tool to produce messages to Kafka topics in these examples, but any tool for producing messages should work.
We're going to publish some messages to the users
topic, which we previously configured Styra Load to consume from.
The data will be in the form of a JSON object per line, and will be stored in a file called users.jsonl
.
You can create the users.jsonl
file now with the content below.
{"id": "d9eccc5c", "name": "Alice", "roles": ["developer", "reports-reader"]}
{"id": "5c0ba07e", "name": "Bob", "roles": ["reports-admin"]}
{"id": "413adc7a", "name": "Eve", "roles": ["database-reader", "database-writer"]}
Now we can use this data to publish messages to the users
topic.
kcat -P -b localhost -t users < resources/users.jsonl
To verify that the users now are consumable on the users
topic, we can invoke kcat
as a consumer.
$ kcat -C -b localhost -t users
{"id": "d9eccc5c", "name": "Alice", "roles": ["developer", "reports-reader"]}
{"id": "5c0ba07e", "name": "Bob", "roles": ["reports-admin"]}
{"id": "413adc7a", "name": "Eve", "roles": ["database-reader", "database-writer"]}
% Reached end of topic users [0] at offset 3
Query the data which was streamed to Styra Load
The same data we have seen using kcat
can now be queried from Styra Load.
We can run a simple curl command to verify that the data has been ingested and transformed correctly.
$ curl -s "localhost:8181/v1/data/kafka/messages?pretty=true"
{
"result": {
"413adc7a": {
"name": "Eve",
"roles": [
"database-reader",
"database-writer"
]
},
"5c0ba07e": {
"name": "Bob",
"roles": [
"reports-admin"
]
},
"d9eccc5c": {
"name": "Alice",
"roles": [
"developer",
"reports-reader"
]
}
}
}
Further Reading
- The files used in the examples are also available in the Styra Load blueprints repo.
- View the Kafka configuration for Styra Load.