Kafka
Kafka is an open-source distributed streaming platform designed for handling real-time data feeds and building scalable, fault-tolerant, and high-performance data pipelines. It is used for building and managing event-driven architectures, processing streaming data, and integrating various data systems and applications.
Kafka integration
Description
<kafkaIntegration> - command for integration with Kafka
Required parameters:
- alias - unique name of the integration (e.g., "MY_KAFKA")
- enabled - a boolean flag to enable/disable integration (can be “true” or “false”)
- bootstrapAddress - specifies the address and port of the Kafka cluster's bootstrap server
- autoOffsetReset - determines the behavior when a consumer initially subscribes to a topic or if the consumer's current offset becomes invalid or out of range. It specifies the offset to reset to in such scenarios (can be “earliest” or “latest”)
- maxPollRecords - specifies the maximum number of records that a consumer can fetch in a single poll request from a Kafka broker
- maxPollIntervalMs - specifies the maximum interval, in milliseconds, that a Kafka consumer can spend in a single poll operation before it is considered inactive or idle
- clientId - specifies an identifier for the Kafka client
- groupId - identifies a group of consumer instances that work together to consume messages from one or more topics
- autoCommitTimeout - specifies the time interval, in milliseconds, between automatic commits of consumed offsets by the consumer
<kafkaIntegration> <kafka alias="MY_KAFKA" enabled="false"> <bootstrapAddress>localhost:9094</bootstrapAddress> <autoOffsetReset>earliest</autoOffsetReset> <maxPollRecords>2</maxPollRecords> <maxPollIntervalMs>1000</maxPollIntervalMs> <clientId>group</clientId> <groupId>group</groupId> <autoCommitTimeout>10000</autoCommitTimeout> </kafka></kafkaIntegration>Kafka command
Description
<kafka> – a command that is responsible for sending the Kafka queries in testing scenarios
Required parameters:
- comment
- alias - unique name of the integration
- commands: send or receive (only one can be selected)
Optional parameters:
- condition - the condition according to which this test step will or won't be executed
- threshold - parameter, where the maximum execution time of the command is defined (milliseconds), if the execution time is exceeded, the step will be defined as failed
Send command
x
<kafka comment="RSend a message" alias="KAFKA_0"> <send topic="weather" correlationId="1"> <value> [ "city": "Houston" ] </value> <key>7</key> <headers> <header name="Lviv" value="20"/> <header name="Odessa" value="25"/> </headers> </send> <send topic="weather" correlationId="1"> <file>body_1.json</file> </send></kafka>The send command is used to send a message to Kafka. The command has arguments:
- [required] topic- specifies the target topic to which the message will be sent (e.g., "weather")
- [optional] correlationId - specifies a unique identifier or token called the correlation ID associated with the message being sent (e.g., "1")
- value - specifies the actual content or payload of the message being sent
- key - specifies a key associated with the message being sent
- headers - specifies additional metadata or custom attributes associated with the message being sent
- file - specifies the name or path of a file that contains the content or payload of the message being sent
Receive command
<kafka comment="Receive 1 time" alias="KAFKA_0"> <receive topic="hundred" timeoutMillis="5000" commit="true" headers="false"> <file>expected_226.json</file> </receive> <receive topic="hundred" timeoutMillis="5000" commit="true" headers="false"> <value> [ "city": "Houston" ] </value> </receive></kafka>The receive command is used to receive a message from Kafka. The command has arguments:
- [required] topic - specifies the topic from which the message will be received (e.g., “hundred”)
- [optional] timeoutMillis - specifies the maximum time, in milliseconds, that the consumer will wait for a message before timing out (e.g., "1500", “1500” by default if not specified)
- [optional] commit - specifies the topic from which the message will be received (can be “true” of “false”; “false” by default if not specified)
- [optional] headers - indicates that the received message will include or not include headers (can be “true” of “false”; “true” by default if not specified)
- file - specified the name or path of a file that will be used for comparison
- value - define the content or payload of the received Kafka message