Skip to main content

Kafka Datasource Configuration

Styra Load's support for Apache Kafka makes it possible to stream data updates to Styra Load. This can be useful when events representing changes to data used in policy evaluation are available on a Kafka topic.

Example Configuration

The Kafka integration is provided through the data plugin, and needs to be enabled in Styra Load's configuration.

load-conf.yaml (minimal)

plugins:
data:
kafka.messages:
type: kafka
urls:
- broker:29092
topics:
- users
rego_transform: "data.e2e.transform"

In addition to the minimal configuration above, note the following:

  • The kafka.messages key will be used as a "namespace" by the plug-in, and will have Load use data.kafka.messages for data ingested through Kafka. Use the name makes the most sense for your application.
  • The topics array will be the Kafka topics from which Styra Load uses consume the messages.
  • The rego_transform attribute allows uses a message transformer on any incoming message.

Consumer Offset

The default configuration will retrieve all messages persisted for the requested topic.

To only comsume messages starting from a certain point in time, you can configure from to one of:

  • "start": consume all messages (default)
  • "end": consume from the end, i.e., all new messages from the point of connecting
  • a duration, e.g. "1h", to get all messages starting from those published 1 hour ago.

For example, a configuration that subscribes to messages starting from those published not more than ten minutes ago, is:

plugins:
data:
kafka.messages:
type: kafka
urls:
- broker:29092
topics:
- users
from: 10m
rego_transform: "data.e2e.transform"

Message Transformers

The rego_transform attribute specifies the path to a rule used to transform incoming messages into a format suitable for storage in Styra Load. The raw input provided for each transform should be familiar to most Kafka users:

{
"headers": "eyJzb21lIjogImhlYWRlciB2YWx1ZSJ9",
"key": "",
"timestamp": 1675346581,
"topic": "users",
"value": "eyJwYXlpbmciOiAiYXR0ZW50aW9uIn0"
}

Most of the attributes are optional (for example, their values may be empty), and the base64-encoded value is typically used.

rego_transform policies take incoming messages as input and return JSON patch operations to modify the data store of Load. Policies might perform operations such as:

  • Filtering out or target operations for messages from a particular topic
  • Select to ingest only certain fields from the message
  • Switch between adding or removing data from the data store using the JSON patch syntax

An example policy which applies a transformation to messages from the users topic is shown below:

package e2e

import future.keywords.contains
import future.keywords.if

transform contains {"op": "add", "path": payload.id, "value": val} if {
input.topic == "users"

payload := json.unmarshal(base64.decode(input.value))
val := object.filter(payload, ["name", "roles"])
}

Further reading