spark-sql 读取kafka “Failed to find data source: kafka.“ (even with uber-jar)
I use HDP-2.6.3.0 with Spark2 package 2.2.0.
I'm trying to write a Kafka consumer, using the Structured Streaming API, but I'm getting the following error after submit the job to the cluster:
Exception in thread "main" java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages at http://spark.apache.org/third-party-projects.html at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:553) at org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:89) at org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:89) at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:198) at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:90) at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:90) at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30) at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:150) at com.example.KafkaConsumer.main(KafkaConsumer.java:21) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$runMain(SparkSubmit.scala:782) at org.apache.spark.deploy.SparkSubmit$.doRunMain(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.ClassNotFoundException: kafka.DefaultSource at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at org.apache.spark.sql.execution.datasources.DataSource$anonfun$anonfun$apply.apply(DataSource.scala:537) at org.apache.spark.sql.execution.datasources.DataSource$anonfun$anonfun$apply.apply(DataSource.scala:537) at scala.util.Try$.apply(Try.scala:192) at org.apache.spark.sql.execution.datasources.DataSource$anonfun.apply(DataSource.scala:537) at org.apache.spark.sql.execution.datasources.DataSource$anonfun.apply(DataSource.scala:537) at scala.util.Try.orElse(Try.scala:84) at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:537) ... 17 more
Following spark-submit command:
$SPARK_HOME/bin/spark-submit \ --master yarn \ --deploy-mode client \ --class com.example.KafkaConsumer \ --executor-cores 2 \ --executor-memory 512m \ --driver-memory 512m \ sample-kafka-consumer-0.0.1-SNAPSHOT.jar
My java code:
package com.example; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; public class KafkaConsumer { public static void main(String[] args) { SparkSession spark = SparkSession .builder() .appName("kafkaConsumerApp") .getOrCreate(); Dataset ds = spark .readStream() .format("kafka") .option("kafka.bootstrap.servers", "dog.mercadoanalitico.com.br:6667") .option("subscribe", "my-topic") .load(); } }
pom.xml:
4.0.0 com.example sample-kafka-consumer 0.0.1-SNAPSHOT jar org.apache.spark spark-core_2.11 2.2.0 org.apache.spark spark-sql_2.11 2.2.0 org.apache.spark spark-sql-kafka-0-10_2.11 2.2.0 org.apache.kafka kafka_2.11 0.10.1.0 local-maven-repo file:///${project.basedir}/local-maven-repo ${basedir}/src/main/resources org.apache.maven.plugins maven-compiler-plugin 3.6.1 1.8 1.8 org.apache.maven.plugins maven-shade-plugin 3.0.0 package shade *:* META-INF/*.SF META-INF/*.DSA META-INF/*.RSA com.example.KafkaConsumer
[UPDATE] UBER-JAR
Below the configuration used in the pom.xml to generate the uber-jar
org.apache.maven.plugins maven-shade-plugin 3.0.0 package shade *:* META-INF/*.SF META-INF/*.DSA META-INF/*.RSA com.example.KafkaConsumer
如上是stackoverflow上遇到的相同问题:
kafka data source is an external module and is not available to Spark applications by default.
You have to define it as a dependency in your pom.xml (as you have done), but that's just the very first step to have it in your Spark application.
org.apache.spark spark-sql-kafka-0-10_2.11 2.2.0
With that dependency you have to decide whether you want to create a so-called uber-jar that would have all the dependencies bundled altogether (that results in a fairly big jar file and makes the submission time longer) or use --packages (or less flexible --jars) option to add the dependency at spark-submit time.
(There are other options like storing the required jars on Hadoop HDFS or using Hadoop distribution-specific ways of defining dependencies for Spark applications, but let's keep things simple)
I'd recommend using --packages first and only when it works consider the other options.
Use spark-submit --packages to include the spark-sql-kafka-0-10 module as follows.
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0
Include the other command-line options as you wish.'
解决方法1简要汇总:
在spark-submit中使用--packages选项,指定相应版本的spark-sql-kafka包。
或者先将相应版本的包上传到hdfs,或yarn集群每个节点的指定路径,然后提交时使用
spark-submit --jars /opt/cdn/stream-window/lib/spark-sql-kafka-0-10_2.11-2.3.0.jar
spark-submit --jars hdfs:///spark-sql-kafka-0-10_2.11-2.3.0.jar
即可。
Uber-Jar Approach
Including all the dependencies in a so-called uber-jar may not always work due to how META-INF directories are handled.
For kafka data source to work (and other data sources in general) you have to ensure that META-INF/services/org.apache.spark.sql.sources.DataSourceRegister of all the data sources are merged (not replace or first or whatever strategy you use).
kafka data sources uses its own META-INF/services/org.apache.spark.sql.sources.DataSourceRegisterMETA-INF/services/org.apache.spark.sql.sources.DataSourceRegisterMETA-INF/services/org.apache.spark.sql.sources.DataSourceRegister that registers org.apache.spark.sql.kafka010.KafkaSourceProvider as the data source provider for kafka format.
解决方法2:
对于Uber-jar,首先可以解压开,确认spark-sql-kafka-0-10_2.11-2.3.0.jar是否打入应用程序jar包:
如果已经打入应用程序jar包:
确认META-INF/services/org.apache.spark.sql.sources.DataSourceRegister文件中是否包含org.apache.spark.sql.kafka010.KafkaSourceProvider。如果没有自己手动加上:
本人遇到的就是这个问题。
另外可以通过修改maven打包策略修复该问题, 详见下方内容:
方法1:
Even I had similar issue, issue started when we upgraded the Cloudera-Spark version from 2.2 --> 2.3.
Issue was: my uber jar META-INF/serives/org.apache.spark.sql.sources.DataSourceRegister was getting overwritten by similar file which is present in some other jars. Hence it was not able to find the KafkaConsumer entry in 'DataSourceRegister' file.
Resolution: modifying the POM.xml helped me.
META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
方法2:
For uber-jar, adding ServicesResourceTransformer to shade-plugin works for me.
方法3:
I faced the same error, because i exclude everything under META-INF in shade plugin for fixing the shade overlapping resource warning,
META-INF/**
but classLoader need resource to know which DataSource is registered. so remove this exclude, it's work fine to me.
META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
hope it could help someone.
参考文档:
apache spark - Why does format("kafka") fail with "Failed to find data source: kafka." (even with uber-jar)? - Stack Overflow