Question

Kafka doubt

  • 13 January 2023
  • 0 replies
  • 46 views

Can someone please help me with solution for this?

 

I have a kafka processor in origin and jython evaluator next to it.

I need to check incoming source rows from kafka using jython to stop pipeline if no more kafka-messages.

 

JYTHON:


Init Script: state['first_batch'] = "true"

if (state['first_batch'] == "false" and len(records) == 0):
  sdc.log.info("No more Kafka messages to consume. Stopping pipeline. See ya!")
  sdc.toEvent(sdc.createEvent("no-more-messages", 0))

for record in sdc.records:
  try:
    sdc.output.write(record)
  except Exception as e:
    # Send record to error
    sdc.error.write(record, str(e))

if (state['first_batch'] == "true" and len(records) > 0):
  state['first_batch'] = "false

 

Sometimes Jython works, sometimes it keeps running in infinite time even if no events.

If I trigger job once, it works fine, 2nd time also fine, But from third time in a day I see this problem that pipeline doesnt stop even if kafka has no events.


0 replies

Be the first to reply!

Reply