java spark sql操作cassandar
前期准备:
cassandra集群(可以参考网站 https://cassandrazh.github.io/)
spark集群(可以参考我的文章 http://www.jianshu.com/p/756209fa7078)
1、spark中配置cassandar相应的jar包
不配置会报如下异常:ClassNotFoundException:com.datastax.spark.connector.rdd.partitioner.CassandraPartition
怎么配置:
$SPARK_HOME/bin/spark-shell --packages datastax:spark-cassandra-connector:1.6.0-s_2.10
1.6.0指的是spark的版本,s_2.10是scala的版本。
具体版本对应可以参看这个网站:https://spark-packages.org/package/datastax/spark-cassandra-connector
如果用spark-shell命令提交不成功,可以直接在SPARK的spark-env.sh配置中加入SPARK_CLASSPATH
export SPARK_CLASSPATH=$SPARK_CLASSPATH:/opt/spark/spark-cassandra-lib/datastax_spark-cassandra-connector-1.6.0-s_2.10.jar:/opt/spark/spark-cassandra-lib/com.datastax.cassandra_cassandra-driver-core-3.0.0.jar:/opt/spark/spark-cassandra-lib/com.google.guava_guava-16.0.1.jar:/opt/spark/spark-cassandra-lib/com.twitter_jsr166e-1.1.0.jar
记得集群中的每台spark都要加,具体版本具体下载。
2、Eclipse新建一个maven项目
引入相关包(这里引入的spark包版本一定要对应上集群环境安装的spark版本,否则序列化和反序列会失败):
分别引入:spark-core,spark-sql,spark-cassandra-connector
groupId:org.apache.spark
artifactId:spark-core_2.10
version:1.6.0
groupId:org.apache.spark
artifactId:spark-sql_2.10
version:1.6.0
groupId:com.datastax.spark
artifactId:spark-cassandra-connector_2.10
version:1.6.0
如果出现因为google guava造成的错误,可以在引用上面3个包的时候去掉google guava,然后单独引用一个版本.
版本不一样的错误异常:
Caused by: java.lang.RuntimeException: java.io.InvalidClassException: org.apache.spark.rpc.RpcEndpointRef; local class incompatible: stream classdesc serialVersionUID = -1223633663228316618, local class serialVersionUID = 18257903091306170
3、新建一个测试类SparkCassandraConnector(基于spark1.6,注意spark2.x开始不一样,后文有提到)
public class SparkCassandraConnector {
private static final String APP_NAME = "SPARK_CASSANDRA_TESTI"; //这里随便写
private static final String MASTER = "spark://172.16.101.60:7077"; //spark集群地址
private static final String HOST = "172.16.101.60"; //cassandra安装主机地址
private static SparkConf conf = null;
private static SparkContext sparkContext = null;
static {
conf = new SparkConf();
conf.setAppName(APP_NAME);
conf.setMaster(MASTER);
conf.set("spark.cassandra.connection.host", HOST);
sparkContext = new SparkContext(conf); // 初始化一个sparkContext
/* 这里要注意几个地方:1、1.6用的是sparkcontext,2.x开始用sparksession。
2、spark集群中每次只能有一个sparkcontext,当然可以通过配置修改
3、基于以上,所以这里初始化一个sparkcontext,并且不关闭它(缺点也显而易见)
*/
}
public static void main(String[] args) {
CassandraSQLContext csc = new CassandraSQLContext(sparkContext);
csc.setKeyspace("bi"); // 设置cassandra的keyspace ,keyspace有点类似数据库的意思
csc.sql("select * from test").show(); // 这里可以写任何spark支持的sql语句
}
}
spark2开始spark开始使用spark session,而且datasetx将spark-cassandra-connector中的CassandraSQLContext类移除了:
先得到spark session:
SparkSession spark = SparkSession.builder().appName(APP_NAME).config("spark.cassandra.connection.host",HOST).master(MASTER).getOrCreate();
然后将spark的配置文件传递给CassandraConnector:
CassandraConnector connector = CassandraConnector.apply(spark.sparkContext().conf());
这里可以直接通过CassandraConnector这个对象去对cassandra建表,删除表:
Sessionsession=connector.openSession();
session.execute("CREATE TABLE mykeyspace.people(id UUID PRIMARY KEY, username TEXT, email TEXT)");
通过spark得到spark的dataset:
Dataset dataset = spark.read().format("org.apache.spark.sql.cassandra").options(new HashMap() {
{
put("keyspace", "bi"); // cassandra keyspace
put("table", "people"); // cassandra表名
}
}).load();
Dataset dataset2 = spark.read().format("org.apache.spark.sql.cassandra").options(new HashMap() {
{
put("keyspace", "bi");
put("table", "people2");
}
}).load();
上面构造了两个dataset,分别是dataset和dataset2,我假设这2个dataset表结构一样,现在对他做左连接操作:
dataset.createOrReplaceTempView("usertable");
dataset2.createOrReplaceTempView("usertable2");
Dataset dataset3 = spark.sql("select * from usertable left join usertable2 on usertable.id = usertable2.id");
dataset3.show();
不用spark集群,用local模式:
spark是可以不用安装集群,用local模式来写出demo的。
只要将上面的MASTER参数改为local就行了。
但是spark2改为local模式的时候要注意,要在配置中设置:config("spark.sql.warehouse.dir", "file:///D://333");