I'm currently working on a simple pipeline to ingest kafka messages inside a log file.
I'm trying to consume all the data from the beginning of a topic but i'm only getting newer data added to this topic. Once consumed, the previous topic messages are not accessible anymore.
I've already test all the different "Auto Offset Reset" properties. Same for simple and multi topic consumers. In the official documentation docs.streamsets.com : auto.commit.interval.ms bootstrap.servers enable.auto.commit group.id max.poll.records
If I understand correctly all those parameters are locked so I can't disable the offset management and process all the data from the beginning of a topic.
Is there an additionnal Kafka configuration property to use or do I need to configure the topic directly via kafka CLI ??
StreamSets Data Collector version : 3.14.0 Kafka Consumer version : 2.0.0
Regards.
Page 1 / 1
You’re right, Kafka as a system keeps track of its offsets for all its consumers. If you want to reset offsets in Kafka, you need to call Kafka directly. But it you have kafka client installed on your SDC machine, you can use Shell command in Start Event of your pipeline and do this reset.
and then in the Script field of Start Event tab put something like
Indeed this is working, but I’ve encountered a side effect.
Because the script is executed for each record, kafka is overwhelmed with topic offset reset requests.
Is there a solution to only execute it once per batch?
Regards.
Mmm why would the script execute for each record? It’s supposed to work only once when you start the pipeline. How many instances of this pipeline are you running? Do you start them from another pipeline using Start Jobs stage?
My bad, I was trying that via a Shell Executor Stage.
You are right the Start Event is a viable solution if you want to execute a command each time the pipeline is started.
What I need is to execute a command once every batch so I’ve done it like this :
On the Jython Processor :
Produce Events : Activated
Record Processing Mode : Batch by Batch
# Create the event with the specified type and data new_event = sdc.createEvent('custom_event', 1) # Send the event to StreamSets Data Collector sdc.toEvent(new_event) # SIMPLE OUTPUT for record in sdc.records: try: sdc.output.write(record) except Exception as e: # Send record to error sdc.error.write(record, str(e))
This code generate an event for every batch and execute the previously provided Kafka command (Need to be adapted) :