Skip to main content
Question

Kafka and Jython Evaluator Query

  • September 29, 2022
  • 1 reply
  • 73 views

Hi Team,

 

I have a kafka processor in origin and jython evaluator next to it.

I need to check incoming source rows from kafka using jython to stop pipeline if no more kafka-messages. Can someone help me with more details I can configure on kafka?

 

I wrote code:

→ But can someone give me exact difference between len(records) and len(sdc.records)

{ In present code I mentioned len(sdc.records) }

-->Also any hints to convert the same into groovy-code.

Init Script: state['first_batch'] = "true"

if (state['first_batch'] == "false" and len(records) == 0):
  sdc.log.info("No more Kafka messages to consume. Stopping pipeline. See ya!")
  sdc.toEvent(sdc.createEvent("no-more-messages", 0))

for record in sdc.records:
  try:
    sdc.output.write(record)
  except Exception as e:
    # Send record to error
    sdc.error.write(record, str(e))

if (state['first_batch'] == "true" and len(records) > 0):
  state['first_batch'] = "false"

1 reply

Bikram
Headliner
Forum|alt.badge.img+1
  • Headliner
  • 486 replies
  • October 3, 2022

@Priyanka Mynepally 

 

for record in sdc.records:

 

sdc.records contains all data into it from your previous processors output .  Might be in a list .

Here “record” is fetching one by one record from the list .

 

e.g if the Origin/previous processor sent 100 records to Jython , then sdc.records length is 100.

 

Then record in sdc.records means it will retrieve one by one record.

record.length is the length of individual record.

 

This is my understanding , correct me if i am wrong here.

 

 

 

 

 

 


Reply