Solved

Kafka consumer Offset from beginning every batch

  • 5 March 2024
  • 4 replies
  • 41 views

Hello,

I'm currently working on a simple pipeline to ingest kafka messages inside a log file.

I'm trying to consume all the data from the beginning of a topic but i'm only getting newer data added to this topic.
Once consumed, the previous topic messages are not accessible anymore.

I've already test all the different "Auto Offset Reset" properties.
Same for simple and multi topic consumers.
In the official documentation docs.streamsets.com :
auto.commit.interval.ms bootstrap.servers enable.auto.commit group.id max.poll.records

If I understand correctly all those parameters are locked so I can't disable the offset management and process all the data from the beginning of a topic.

Is there an additionnal Kafka configuration property to use or do I need to configure the topic directly via kafka CLI ??

StreamSets Data Collector version : 3.14.0
Kafka Consumer version : 2.0.0

Regards.

icon

Best answer by Clément Vi 11 March 2024, 11:12

View original

4 replies

You’re right, Kafka as a system keeps track of its offsets for all its consumers. If you want to reset offsets in Kafka, you need to call Kafka directly. But it you have kafka client installed on your SDC machine, you can use Shell command in Start Event of your pipeline and do this reset.

and then in the Script field of Start Event tab put something like

/foobar/kafka/kafka_2.13-3.5.1/bin/kafka-consumer-groups.sh --bootstrap-server mykafka:9093 --group streamsetsDataCollector --topic mytopic --reset-offsets --to-earliest --execute;


Hello @roma,

 

Thank you for the provided solution.

Indeed this is working, but I’ve encountered a side effect.

Because the script is executed for each record, kafka is overwhelmed with topic offset reset requests.

Is there a solution to only execute it once per batch?

 

Regards.

Mmm why would the script execute for each record? It’s supposed to work only once when you start the pipeline. How many instances of this pipeline are you running? Do you start them from another pipeline using Start Jobs stage?

My bad, I was trying that via a Shell Executor Stage.

 

You are right the Start Event is a viable solution if you want to execute a command each time the pipeline is started.

 

What I need is to execute a command once every batch so I’ve done it like this :

On the Jython Processor :

  • Produce Events : Activated
  • Record Processing Mode : Batch by Batch
# Create the event with the specified type and data
new_event = sdc.createEvent('custom_event', 1)
# Send the event to StreamSets Data Collector
sdc.toEvent(new_event)
# SIMPLE OUTPUT
for record in sdc.records:
try:
sdc.output.write(record)
except Exception as e:
# Send record to error
sdc.error.write(record, str(e))

This code generate an event for every batch and execute the previously provided Kafka command (Need to be adapted) :

/foobar/kafka/kafka_2.13-3.5.1/bin/kafka-consumer-groups.sh --bootstrap-server mykafka:9093 --group streamsetsDataCollector --topic mytopic --reset-offsets --to-earliest --execute;

It also output all the records of the batch.

 

Thank you for your support.

 

Regards.

 

Reply