Question

Error on kafka consumer

  • 23 May 2022
  • 5 replies
  • 154 views

Hi,

 

I try to combine a datacollector and a transformer pipeline (due to the missing parquet s3 support in datacollector). I read data from a Postgres db and publish it to Kafka using the datacollector. I have a Kafka connection that I want to use for both pipelines. Pushing Data to Kafka works but when I run the transfom pipeline (using the same connection) I get the following error:

 

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, 53f742dd118a, executor driver): java.lang.NoSuchMethodError: ‘void org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumerPool$PoolConfig.setEvictionPolicy(org.apache.commons.pool2.impl.EvictionPolicy)'	at org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumerPool$PoolConfig.init(InternalKafkaConsumerPool.scala:191)	at org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumerPool$PoolConfig.<init>(InternalKafkaConsumerPool.scala:162)	at org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumerPool.<init>(InternalKafkaConsumerPool.scala:53)	at org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer$.<init>(KafkaDataConsumer.scala:606)	at org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer$.<clinit>(KafkaDataConsumer.scala)	at org.apache.spark.sql.kafka010.KafkaSourceRDD.compute(KafkaSourceRDD.scala:72)	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)	at org.apache.spark.sql.execution.SQLExecutionRDD.compute(SQLExecutionRDD.scala:55)	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)	at org.apache.spark.scheduler.Task.run(Task.scala:127)	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)	at java.base/java.lang.Thread.run(Thread.java:834)Driver stacktrace:	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)	at scala.Option.foreach(Option.scala:407)	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2120)	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2139)	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:467)	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:420)	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3627)	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2697)	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618)	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616)	at org.apache.spark.sql.Dataset.head(Dataset.scala:2697)	at com.streamsets.dataformat.spark.json.JsonParserGenerator$.parse(JsonParserGenerator.scala:25)	at com.streamsets.pipeline.spark.origin.kafka.KafkaOrigin.generate(KafkaOrigin.scala:150)	at com.streamsets.datatransformer.api.operator.Origin.lambda$getOrCreate$0(Origin.java:17)	at com.streamsets.datatransformer.api.operator.Operator.generateDF(Operator.java:95)	at com.streamsets.datatransformer.api.operator.Operator.lambda$getOrCreate$0(Operator.java:89)	at java.base/java.util.Optional.orElseGet(Optional.java:369)	at com.streamsets.datatransformer.api.operator.Operator.getOrCreate(Operator.java:89)	at com.streamsets.datatransformer.api.operator.Origin.getOrCreate(Origin.java:15)	at com.streamsets.datatransformer.api.spark.SparkOrigin.getOrCreate(SparkOrigin.java:27)	at com.streamsets.datatransformer.dag.BaseBatchDAGRunner.$anonfun$generateDataRefs$2(BaseBatchDAGRunner.scala:608)	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)	at scala.collection.TraversableLike.map(TraversableLike.scala:238)	at scala.collection.TraversableLike.map$(TraversableLike.scala:231)	at scala.collection.AbstractTraversable.map(Traversable.scala:108)	at com.streamsets.datatransformer.dag.BaseBatchDAGRunner.$anonfun$generateDataRefs$1(BaseBatchDAGRunner.scala:602)	at scala.collection.mutable.LinkedHashMap.foreach(LinkedHashMap.scala:143)	at com.streamsets.datatransformer.dag.BaseBatchDAGRunner.generateDataRefs(BaseBatchDAGRunner.scala:597)	at com.streamsets.datatransformer.dag.BaseBatchDAGRunner.run(BaseBatchDAGRunner.scala:552)	at com.streamsets.pipeline.spark.dag.SparkBatchDAGRunner.run(SparkBatchDAGRunner.scala:83)	at com.streamsets.pipeline.spark.dag.SparkBatchDAGRunner.run(SparkBatchDAGRunner.scala:43)	at com.streamsets.datatransformer.dag.runner.DataTransformerRunner.startDataTransformerDagRunner(DataTransformerRunner.java:483)	at com.streamsets.datatransformer.dag.runner.DataTransformerRunner.start(DataTransformerRunner.java:262)	at com.streamsets.datacollector.execution.runner.common.AsyncRunner.lambda$start$3(AsyncRunner.java:150)	at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.lambda$call$0(SafeScheduledExecutorService.java:214)	at com.streamsets.datacollector.security.GroupsInScope.execute(GroupsInScope.java:43)	at com.streamsets.datacollector.security.GroupsInScope.execute(GroupsInScope.java:24)	at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.call(SafeScheduledExecutorService.java:210)	at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.lambda$call$0(SafeScheduledExecutorService.java:214)	at com.streamsets.datacollector.security.GroupsInScope.execute(GroupsInScope.java:43)	at com.streamsets.datacollector.security.GroupsInScope.execute(GroupsInScope.java:24)	at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.call(SafeScheduledExecutorService.java:210)	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)	at com.streamsets.datacollector.metrics.MetricSafeScheduledExecutorService$MetricsTask.run(MetricSafeScheduledExecutorService.java:88)	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)	at java.base/java.lang.Thread.run(Thread.java:834)Caused by: java.lang.NoSuchMethodError: 'void org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumerPool$PoolConfig.setEvictionPolicy(org.apache.commons.pool2.impl.EvictionPolicy)'	at org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumerPool$PoolConfig.init(InternalKafkaConsumerPool.scala:191)	at org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumerPool$PoolConfig.<init>(InternalKafkaConsumerPool.scala:162)	at org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumerPool.<init>(InternalKafkaConsumerPool.scala:53)	at org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer$.<init>(KafkaDataConsumer.scala:606)	at org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer$.<clinit>(KafkaDataConsumer.scala)	at org.apache.spark.sql.kafka010.KafkaSourceRDD.compute(KafkaSourceRDD.scala:72)	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)	at org.apache.spark.sql.execution.SQLExecutionRDD.compute(SQLExecutionRDD.scala:55)	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)	at org.apache.spark.scheduler.Task.run(Task.scala:127)	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)	... 3 more

5 replies

Userlevel 3
Badge

hi @mstoffel, actually, if you can use local storage as a “staging” area you can use Data Collector to convert Avro files to Parquet before you load to a destination data lake of choice.

Check this chapter in the manual: https://docs.streamsets.com/portal/platform-datacollector/latest/datacollector/UserGuide/Processors/WholeFileTransformer.html#concept_nwg_rx4_l2b

If you need further help let us know via DM.

Thanks for the answer, will try that.

but what about the error?

Really cumbersome, I will not use that. To read from a database and write to parquet I have to use two pipelines. One to write Avro files to a local dir in the container. A second to again read those files and write it to S3 as parquet. How to delete tmp files etc…... 

 

Userlevel 2
Badge

Hi @mstoffel You can use the “Post processing feature” in the directory origin to delete the files once it is processed.

https://docs.streamsets.com/portal/datacollector/latest/help/datacollector/UserGuide/Origins/Directory.html#task_o5v_bp1_n3b

About the error:

Caused by: java.lang.NoSuchMethodError: 'void org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumerPool$PoolConfig.setEvictionPolicy(org.apache.commons.pool2.impl.EvictionPolicy)'

Generally this error message occurs if you have any jar/version mismatch.

@mstoffel @Ranjith P I’ve had the same (similar?) error and fixed it with:

wget https://repo1.maven.org/maven2/org/apache/commons/commons-pool2/2.11.0/commons-pool2-2.11.0.jar
sudo mv commons-pool2-2.11.0.jar /opt/streamsets/spark-3.0.3-bin-hadoop3.2/jars/

Looks like `org.apache.commons.pool2.impl.EvictionPolicy` was introduced to this lib in 2.11.0 version …

https://github.com/apache/commons-pool/blob/master/src/main/java/org/apache/commons/pool2/impl/EvictionPolicy.java

Reply