Question

Mongodb origin error

  • 26 July 2022
  • 2 replies
  • 64 views

Hi, 

I am trying to read from the mongo db. I have set as below in the scala code for the below pipeline:

val df=spark.read.format("com.mongodb.spark.sql.DefaultSource").option( "spark.mongodb.input.uri", "mongodb://mu:mp@mh:port/md.cname?authSource=mad&readPreference=primary&ssl=false").load()

where the abbreviations stand for

mad - mongodb.authentication-database
mu - mongodb.username
mh - mongodb.host
md - mongodb.database
cname - collection name
mp - mongodb.password

 

Error:

TRANSFORMER_00 - TRANSFORMER_00 - Failed at operator: Operator: [Type: class com.streamsets.pipeline.spark.transform.scala.ScalaTransform id: Scala_01]: Class: Failed to find data source: com.mongodb.spark.sql.DefaultSource. Please find packages at http://spark.apache.org/third-party-projects.html was not found. This usually happens when running on an unsupported version of Spark, The offsets for failure occurred was 'Unknown'

 


2 replies

@Giuseppe Mura Thank you but I don’t see external libraries as in below:

I am unable to connect to streamsets today as it’s unstable but the error I now have is below:

java.lang.NullPointerException
at com.streamsets.pipeline.spark.transform.scala.ScalaTransform.transform(ScalaTransform.scala:62)
at com.streamsets.datatransformer.api.operator.Transform.lambda$getOrTransform$0(Transform.java:17)
at com.streamsets.datatransformer.api.operator.Operator.generateDF(Operator.java:95)
at com.streamsets.datatransformer.api.operator.Operator.lambda$getOrCreate$0(Operator.java:89)
at java.util.Optional.orElseGet(Optional.java:267)
at com.streamsets.datatransformer.api.operator.Operator.getOrCreate(Operator.java:89)
at com.streamsets.datatransformer.api.operator.Transform.getOrTransform(Transform.java:15)
at com.streamsets.datatransformer.api.spark.SparkTransform.getOrTransform(SparkTransform.java:27)
at com.streamsets.datatransformer.dag.BaseBatchDAGRunner$$anonfun$generateDataRefs$1.apply(BaseBatchDAGRunner.scala:626)
at com.streamsets.datatransformer.dag.BaseBatchDAGRunner$$anonfun$generateDataRefs$1.apply(BaseBatchDAGRunner.scala:597)
at scala.collection.mutable.LinkedHashMap.foreach(LinkedHashMap.scala:141)
at com.streamsets.datatransformer.dag.BaseBatchDAGRunner.generateDataRefs(BaseBatchDAGRunner.scala:597)
at com.streamsets.datatransformer.dag.BaseBatchDAGRunner.run(BaseBatchDAGRunner.scala:552)
at com.streamsets.pipeline.spark.dag.SparkBatchDAGRunner.run(SparkBatchDAGRunner.scala:83)
at com.streamsets.pipeline.spark.dag.SparkBatchDAGRunner.run(SparkBatchDAGRunner.scala:43)
at com.streamsets.datatransformer.dag.runner.DataTransformerRunner.startDataTransformerDagRunner(DataTransformerRunner.java:483)
at com.streamsets.datatransformer.dag.runner.DataTransformerRunner.start(DataTransformerRunner.java:262)
at com.streamsets.datacollector.execution.runner.common.AsyncRunner.lambda$start$3(AsyncRunner.java:150)
at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.lambda$call$0(SafeScheduledExecutorService.java:214)
at com.streamsets.datacollector.security.GroupsInScope.execute(GroupsInScope.java:43)
at com.streamsets.datacollector.security.GroupsInScope.execute(GroupsInScope.java:24)
at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.call(SafeScheduledExecutorService.java:210)
at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.lambda$call$0(SafeScheduledExecutorService.java:214)
at com.streamsets.datacollector.security.GroupsInScope.execute(GroupsInScope.java:43)
at com.streamsets.datacollector.security.GroupsInScope.execute(GroupsInScope.java:24)
at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.call(SafeScheduledExecutorService.java:210)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at com.streamsets.datacollector.metrics.MetricSafeScheduledExecutorService$MetricsTask.run(MetricSafeScheduledExecutorService.java:88)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

 

Userlevel 3
Badge

@Dimple , have you loaded the MongoDB driver against the Spark Basic stage library? If you click on the Scala stage, you will be able to see/add the required External Libs. 
 

Please refer to the manual for methods of installation of external libs: https://docs.streamsets.com/portal/transformer/latest/help/transformer/Installation/ExternalLibs.html#concept_s3h_4rv_pkb

Reply