Skip to main content

Hi Team,

 

I have a kafka processor in origin and jython evaluator next to it.

I need to check incoming source rows from kafka using jython to stop pipeline if no more kafka-messages. Can someone help me with more details I can configure on kafka?

 

I wrote code:

→ But can someone give me exact difference between len(records) and len(sdc.records)

{ In present code I mentioned len(sdc.records) }

-->Also any hints to convert the same into groovy-code.

Init Script: states'first_batch'] = "true"

if (states'first_batch'] == "false" and len(records) == 0):
sdc.log.info("No more Kafka messages to consume. Stopping pipeline. See ya!")
sdc.toEvent(sdc.createEvent("no-more-messages", 0))

for record in sdc.records:
try:
sdc.output.write(record)
except Exception as e:
# Send record to error
sdc.error.write(record, str(e))

if (states'first_batch'] == "true" and len(records) > 0):
states'first_batch'] = "false"

@Priyanka Mynepally 

 

for record in sdc.records:

 

sdc.records contains all data into it from your previous processors output .  Might be in a list .

Here “record” is fetching one by one record from the list .

 

e.g if the Origin/previous processor sent 100 records to Jython , then sdc.records length is 100.

 

Then record in sdc.records means it will retrieve one by one record.

record.length is the length of individual record.

 

This is my understanding , correct me if i am wrong here.

 

 

 

 

 

 


Reply