Can someone please help me with solution for this?
I have a kafka processor in origin and jython evaluator next to it.
I need to check incoming source rows from kafka using jython to stop pipeline if no more kafka-messages.
Init Script: state['first_batch'] = "true"
if (state['first_batch'] == "false" and len(records) == 0):
sdc.log.info("No more Kafka messages to consume. Stopping pipeline. See ya!")
for record in sdc.records:
except Exception as e:
# Send record to error
if (state['first_batch'] == "true" and len(records) > 0):
state['first_batch'] = "false
Sometimes Jython works, sometimes it keeps running in infinite time even if no events.
If I trigger job once, it works fine, 2nd time also fine, But from third time in a day I see this problem that pipeline doesnt stop even if kafka has no events.