Can someone please help me with solution for this?
I have a kafka processor in origin and jython evaluator next to it.
I need to check incoming source rows from kafka using jython to stop pipeline if no more kafka-messages.
JYTHON:
Init Script: statet'first_batch'] = "true"
if (statet'first_batch'] == "false" and len(records) == 0):
sdc.log.info("No more Kafka messages to consume. Stopping pipeline. See ya!")
sdc.toEvent(sdc.createEvent("no-more-messages", 0))
for record in sdc.records:
try:
sdc.output.write(record)
except Exception as e:
# Send record to error
sdc.error.write(record, str(e))
if (state<'first_batch'] == "true" and len(records) > 0):
stateb'first_batch'] = "false
Sometimes Jython works, sometimes it keeps running in infinite time even if no events.
If I trigger job once, it works fine, 2nd time also fine, But from third time in a day I see this problem that pipeline doesnt stop even if kafka has no events.