Kafka Datasource Configuration
Enterprise OPA's support for Apache Kafka makes it possible to stream data updates to Enterprise OPA. This can be useful when events representing changes to data used in policy evaluation are available on a Kafka topic.
Example Configuration
The Kafka integration is provided through the data
plugin, and needs to be enabled in Enterprise OPA's configuration.
enterprise-opa-conf.yaml (minimal)
plugins:
data:
kafka.messages:
type: kafka
urls:
- broker:29092
topics:
- users
rego_transform: "data.e2e.transform"
In addition to the minimal configuration above, note the following:
- The
kafka.messages
key will be used as a "namespace" by the plug-in, and will have Enterprise OPA usedata.kafka.messages
for data ingested through Kafka. Use the name makes the most sense for your application. - The
topics
array will be the Kafka topics from which Enterprise OPA uses consume the messages. - The
rego_transform
attribute allows uses a message transformer on any incoming message.
Consumer Offset
The default configuration will retrieve all messages persisted for the requested topic.
To only consume messages starting from a certain point in time, you can configure
from
to one of:
"start"
: consume all messages (default)"end"
: consume from the end, i.e., all new messages from the point of connecting- a duration, e.g.
"1h"
, to get all messages starting from those published 1 hour ago.
For example, a configuration that subscribes to messages starting from those published not more than ten minutes ago, is:
plugins:
data:
kafka.messages:
type: kafka
urls:
- broker:29092
topics:
- users
from: 10m
rego_transform: "data.e2e.transform"
Message Transformers
The rego_transform
attribute specifies the path to a rule used to transform incoming messages into a format suitable for storage in Enterprise OPA. The raw input provided for each transform should be familiar to most Kafka users:
{
"headers": "eyJzb21lIjogImhlYWRlciB2YWx1ZSJ9",
"key": "",
"timestamp": 1675346581,
"topic": "users",
"value": "eyJwYXlpbmciOiAiYXR0ZW50aW9uIn0"
}
Most of the attributes are optional (for example, their values may be empty), and the base64-encoded value
is typically used.
rego_transform
policies take incoming messages as input and return JSON patch operations to modify the data store of Enterprise OPA.
Policies might perform operations such as:
- Filtering out or target operations for messages from a particular topic
- Select to ingest only certain fields from the message
- Switch between adding or removing data from the data store using the JSON patch syntax
An example policy which applies a transformation to messages from the users
topic is shown below:
package e2e
import future.keywords.contains
import future.keywords.if
transform contains {"op": "add", "path": payload.id, "value": val} if {
input.topic == "users"
payload := json.unmarshal(base64.decode(input.value))
val := object.filter(payload, ["name", "roles"])
}
Further reading
- Please see the tutorial on configuring Enterprise OPA with Kafka for an end to end example.