Hi Team,
I have a kafka processor in origin and jython evaluator next to it.
I need to check incoming source rows from kafka using jython to stop pipeline if no more kafka-messages. Can someone help me with more details I can configure on kafka?
I wrote code:
→ But can someone give me exact difference between len(records) and len(sdc.records)
{ In present code I mentioned len(sdc.records) }
-->Also any hints to convert the same into groovy-code.
Init Script: states'first_batch'] = "true"
if (states'first_batch'] == "false" and len(records) == 0):
sdc.log.info("No more Kafka messages to consume. Stopping pipeline. See ya!")
sdc.toEvent(sdc.createEvent("no-more-messages", 0))
for record in sdc.records:
try:
sdc.output.write(record)
except Exception as e:
# Send record to error
sdc.error.write(record, str(e))
if (states'first_batch'] == "true" and len(records) > 0):
states'first_batch'] = "false"