Skip to main content
Quick navigation

Produce Policy

Introduction

The produce policy interceptor will impose limits on incoming messages to kafka to ensure that messages going to kafka adhere to the configured specification.

What happens when sending an invalid request

Any request that doesn't match the interceptor's configuration will be blocked and return the corresponding error message.

For example: you want to send a record without a header, but the interceptor is configured with recordHeaderRequired=true.

When you send that request to the cluster, the following error is returned:

org.apache.kafka.common.errors.PolicyViolationException: 
Request parameters do not satisfy the configured policy. Headers are required

Configuration

keytypedefaultdescription
topicString.*Topics that match this regex will have the interceptor applied. If no value is set, it will be applied to all topics.
acksAcksConfiguration for acks modes. Note acks=0 is still blocked but is ignored by the client.
recordHeaderRequiredBooleanConfiguration of header usage
compressionsCompression TypeConfiguration for compression types
idempotenceRequiredBooleanConfiguration for idempotency usage
transactionRequiredBooleanConfiguration for transaction usage
versionVersionConfiguration for produce version

Acks

keytypedefaultdescription
valueArray[integer]Only these acks modes are allowed, allowed values: -1, 0, 1. Note acks=0 is still blocked but is ignored by the client.
actionActionBLOCKAction to take if the value is outside the specified range.
throttleTimeMsint100Value to throttle with (only applicable when action is set to THROTTLE).

Boolean

keytypedefaultdescription
valueBooleanValue for the configuration. If action is OVERRIDE, will use this value for override value
actionActionBLOCKAction to take if the value is outside the specified range.
throttleTimeMsint100Value to throttle with (only applicable when action is set to THROTTLE).

Version

keytypedefaultdescription
minintMinimum value of produce version
maxintMaximum value of produce version
actionActionBLOCKAction to take if the value is outside the specified range.
throttleTimeMsint100Value to throttle with (only applicable when action is set to THROTTLE).

Compression Type

keytypedefaultdescription
valuesSet[Compression]Set of string contains compression types.
actionActionBLOCKAction to take if the value is outside the specified range. `
overrideValueCompressionValue to override with (only applicable when action is set to OVERRIDE).
throttleTimeMsint100Value to throttle with (only applicable when action is set to THROTTLE).

Compression

  • NONE
  • GZIP
  • SNAPPY
  • LZ4
  • ZSTD

Action

  • BLOCK → when fail, save in audit and return error.
  • INFO → execute API with wrong value, save in audit.
  • THROTTLE → when fail, save in audit and the request will be throttled with time = throttleTimeMs.

Example

{
"name": "myProducePolicyPlugin",
"pluginClass": "io.conduktor.gateway.interceptor.safeguard.ProducePolicyPlugin",
"priority": 100,
"config": {
"topic": "topic.*",
"acks": {
"value": [
-1,
0
],
"action": "BLOCK"
},
"recordHeaderRequired": {
"value": true,
"action": "BLOCK"
},
"compressions": {
"value": [
"NONE",
"GZIP"
],
"action": "INFO"
},
"idempotenceRequired": {
"value": true,
"action": "INFO"
},
"transactionRequired": {
"value": true
},
"version": {
"min": 1,
"max": 3,
"action": "BLOCK"
}
}
}