spark-sql 读取kafka “Failed to find data source: kafka.“ (even with uber-jar)

小明 2025-05-06 08:40:05 4

I use HDP-2.6.3.0 with Spark2 package 2.2.0.

I'm trying to write a Kafka consumer, using the Structured Streaming API, but I'm getting the following error after submit the job to the cluster:

Exception in thread "main" java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages at http://spark.apache.org/third-party-projects.html
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:553)
at org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:89)
at org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:89)
at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:198)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:90)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:90)
at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:150)
at com.example.KafkaConsumer.main(KafkaConsumer.java:21)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$runMain(SparkSubmit.scala:782)
at org.apache.spark.deploy.SparkSubmit$.doRunMain(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: kafka.DefaultSource
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.apache.spark.sql.execution.datasources.DataSource$anonfun$anonfun$apply.apply(DataSource.scala:537)
at org.apache.spark.sql.execution.datasources.DataSource$anonfun$anonfun$apply.apply(DataSource.scala:537)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.sql.execution.datasources.DataSource$anonfun.apply(DataSource.scala:537)
at org.apache.spark.sql.execution.datasources.DataSource$anonfun.apply(DataSource.scala:537)
at scala.util.Try.orElse(Try.scala:84)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:537)
... 17 more

Following spark-submit command:

$SPARK_HOME/bin/spark-submit \
     ​--master yarn \
​     --deploy-mode client \
​​     --class com.example.KafkaConsumer \​
​     --executor-cores 2 \
​​     --executor-memory 512m \​           
     --driver-memory 512m \​           
     sample-kafka-consumer-0.0.1-SNAPSHOT.jar​

My java code:

package com.example;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class KafkaConsumer {
    public static void main(String[] args) {
        SparkSession spark = SparkSession
                  .builder()
                  .appName("kafkaConsumerApp")
                  .getOrCreate();
        Dataset ds = spark
                  .readStream()
                  .format("kafka")
                  .option("kafka.bootstrap.servers", "dog.mercadoanalitico.com.br:6667")
                  .option("subscribe", "my-topic")
                  .load();
    }
}

pom.xml:

  4.0.0
  com.example
  sample-kafka-consumer
  0.0.1-SNAPSHOT
  jar
    
        
        
            org.apache.spark
            spark-core_2.11
            2.2.0
        
        
            org.apache.spark
            spark-sql_2.11
            2.2.0
        
        
            org.apache.spark
            spark-sql-kafka-0-10_2.11
            2.2.0
        
        
        
            org.apache.kafka
            kafka_2.11
            0.10.1.0
        
      
    
        
            local-maven-repo
            file:///${project.basedir}/local-maven-repo
        
     
    
        
        
            
                ${basedir}/src/main/resources
            
        
        
            
            
                org.apache.maven.plugins
                maven-compiler-plugin
                3.6.1
                
                    1.8
                    1.8
                
                   
            
            
                org.apache.maven.plugins
                maven-shade-plugin
                3.0.0
                
                    
                        package
                        
                            shade
                        
                        
                            
                                
                                
                                    *:*
                                    
                                        META-INF/*.SF
                                        META-INF/*.DSA
                                        META-INF/*.RSA
                                    
                                
                            
                            
                                
                                    com.example.KafkaConsumer
                                
                            
                        
                    
                
            
        
        

[UPDATE] UBER-JAR

Below the configuration used in the pom.xml to generate the uber-jar

            
                org.apache.maven.plugins
                maven-shade-plugin
                3.0.0
                
                    
                        package
                        
                            shade
                        
                        
                            
                                
                                
                                    *:*
                                    
                                        META-INF/*.SF
                                        META-INF/*.DSA
                                        META-INF/*.RSA
                                    
                                
                            
                            
                                
                                    com.example.KafkaConsumer
                                
                            
                        
                    
                
            

如上是stackoverflow上遇到的相同问题:

kafka data source is an external module and is not available to Spark applications by default.

You have to define it as a dependency in your pom.xml (as you have done), but that's just the very first step to have it in your Spark application.

    
        org.apache.spark
        spark-sql-kafka-0-10_2.11
        2.2.0
    

With that dependency you have to decide whether you want to create a so-called uber-jar that would have all the dependencies bundled altogether (that results in a fairly big jar file and makes the submission time longer) or use --packages (or less flexible --jars) option to add the dependency at spark-submit time.

(There are other options like storing the required jars on Hadoop HDFS or using Hadoop distribution-specific ways of defining dependencies for Spark applications, but let's keep things simple)

I'd recommend using --packages first and only when it works consider the other options.

Use spark-submit --packages to include the spark-sql-kafka-0-10 module as follows.

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0

Include the other command-line options as you wish.'

解决方法1简要汇总:

在spark-submit中使用--packages选项,指定相应版本的spark-sql-kafka包。

或者先将相应版本的包上传到hdfs,或yarn集群每个节点的指定路径,然后提交时使用

spark-submit --jars /opt/cdn/stream-window/lib/spark-sql-kafka-0-10_2.11-2.3.0.jar

spark-submit --jars hdfs:///spark-sql-kafka-0-10_2.11-2.3.0.jar

即可。

 

Uber-Jar Approach

Including all the dependencies in a so-called uber-jar may not always work due to how META-INF directories are handled.

For kafka data source to work (and other data sources in general) you have to ensure that META-INF/services/org.apache.spark.sql.sources.DataSourceRegister of all the data sources are merged (not replace or first or whatever strategy you use).

kafka data sources uses its own META-INF/services/org.apache.spark.sql.sources.DataSourceRegisterMETA-INF/services/org.apache.spark.sql.sources.DataSourceRegisterMETA-INF/services/org.apache.spark.sql.sources.DataSourceRegister that registers org.apache.spark.sql.kafka010.KafkaSourceProvider as the data source provider for kafka format.

解决方法2:

对于Uber-jar,首先可以解压开,确认spark-sql-kafka-0-10_2.11-2.3.0.jar是否打入应用程序jar包:

如果已经打入应用程序jar包:

确认META-INF/services/org.apache.spark.sql.sources.DataSourceRegister文件中是否包含org.apache.spark.sql.kafka010.KafkaSourceProvider。如果没有自己手动加上:

本人遇到的就是这个问题。

另外可以通过修改maven打包策略修复该问题, 详见下方内容:

方法1:  

Even I had similar issue, issue started when we upgraded the Cloudera-Spark version from 2.2 --> 2.3.

Issue was: my uber jar META-INF/serives/org.apache.spark.sql.sources.DataSourceRegister was getting overwritten by similar file which is present in some other jars. Hence it was not able to find the KafkaConsumer entry in 'DataSourceRegister' file.

Resolution: modifying the POM.xml helped me.

  
        
             
                   META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
             
        
   

方法2:

For uber-jar, adding ServicesResourceTransformer to shade-plugin works for me.

    

 方法3:

I faced the same error, because i exclude everything under META-INF in shade plugin for fixing the shade overlapping resource warning,

META-INF/**

but classLoader need resource to know which DataSource is registered. so remove this exclude, it's work fine to me.

 
      META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
 

hope it could help someone.

参考文档:

apache spark - Why does format("kafka") fail with "Failed to find data source: kafka." (even with uber-jar)? - Stack Overflow

The End
微信