外部数据源实现的主要接口与类
上一篇我们已经介绍了DataFrame构建的三种方法:RDD转DataFrame,Hive表,外部数据源。今天我们来重点介绍一下外部数据源是怎么实现的,以及我们将手动实现一个外部数据源。
format().load()的调用方
RelationProvider
我们在使用外部数据源时使用的语法是:
val spark = SparkSession.builder().master("local[2]").appName("test").getOrCreate()
val df = spark.read.format("json").load("/home/doudou/data/person.json")
其实在这个过程当中,就是在底层调用了RelationProvider这个接口的实现类里面的createRelation这个方法。从类名还有方法名,我们能推倒出我们调用这个方法的目的是为了创建一个Relation的对象。我们先从源码的角度来看下这究竟是个什么东西。
从上面的图片中我们可以解读到,这个接口的实现类是可以为某一种外部数据源来产生一种relation,当我们的实现类实现了这个接口后我们就要重写createRelation这个方法,它的返回值是一个BaseRelaion对象(后面我们也将会介绍这个抽象类的作用)。
SchemaRelationProvider
这个接口其实和上面那个RelationProvider是差不多的,里面同样要重写一个createRelation方法,但是这个方法比上面接口的createRelation方法多了一个参数,我们再从源码的角度来看看:
这个接口的介绍和上面的那个是很像的,其实都是一个提供BaseRelation的方法而已。
DataSourceRegister
这也是一个接口,里面提供了一个简单的方法shortName。这个方法是一个提供别名的方法,那为什么要提供别名呢?我们在使用format这个方法的时候都是传入诸如:“jdbc”,“json”,“csv”,"parquet"这样简短的字符串,其实我们需要传入的是一个完整的包名,但是就是因为使用了这个方法,我们可以直接使用这种简短的字符串就能定义好我们要使用的外部数据源了。
那么format().load()在调用的是谁呢?DefaultSource这个类,而这个类是要实现我们上面提到的这些接口的,否则将无法产生Relation对象,进而将无法得到我们最终想要的DataFrame对象的。
DataFrame从哪里来的
上面我们一直提高到的Relation对象究竟是个什么鬼?我们从源码中找答案:
从这个方法的介绍我们可以知道,这个方法是将一个由外部数据源创建的BaseRelation对象转成一个DataFrame,传入的行参是一个BaseRelation,返回值是DataFrame。由此我们可以知道在使用外部数据源的时候,程序是先创建了一个BaseRelation对象的,然后再转化成一个DataFrame。
那么为什么一定要先有一个BaseRelation对象,再来转化成一个DataFrame呢?
BaseRelation抽象类
我们还是先看下源码:
首先这是一个抽闲类(一个带有schema信息的数据集,我们可以理解为这是一个DataFrame的雏形吧),那么我们实际上是使用它的继承子类来构建具体的Relation了。在这个抽象类里面,有两个抽象方法需要我们来重写的,一个是schema:这个方法是用来构建我们Relation的schema信息的;另一个是sqlContext:这个方法就是使用到SQLContext的对象,因为在Spark2.x版本之前,我们不是使用SparkSession的,而是使用SQLContext的,现在只是将SQLContext整合到SparkSession里面而已。
由上面我们可以知道为什么我们在使用外部数据源的时候可以自动地给出Schema信息了,而不需要我们自己手动构建一个StructType传进去了。因为schema这个方法就是用来从外部数据源那里解析schema的。
TableScan
这个一个接口,里面有一个抽闲方法buildScan,这个方法的返回值是一个RDD[Row],但是这个方法只能获取外部数据源的全部数据信息。
PrunedScan
这个接口里面同样提供了一个buildScan这样的抽象方法,返回值也是一个RDD[Row],但是这个方法使用行参的。所以实际上这个方法是对数据源的数据进行列裁剪以后在获取到数据的。
里面的buildScan方法需要传入个以String为泛型的数据,数据里面转载的就是我们需要的列名。
PrunedFilterScan
同上,就是再多了一个行过滤,所以里面的buildScan方法有多了一个行参,Array[Filter]表示行过滤的Filter。
InsertableRelation
能够把数据从数据源获取得到,那么肯定可以将数据写回到数据源那里吧。这个接口的里面的insert方法就是完成这个任务的。
实验部分
上面说了那么多个接口和类,下面我们将分别进行两个实验来完成从查阅到数据的schema信息,到可以自由读写数据,最后我们直接使用别名就能完成我们的数据获取了。
实验一
这里我的schema就写死了,因为我们获取到的数据就是长那个样的(这种方法就是在生产当中,为了方便某一个业务线而增添,如果想要再便捷一点的话,可以考虑自己直接解析数据)
这个是Relation的代码,它是直接继承了BaseRelation这个抽闲类的
package scala.spark.text
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
class DouRelation(val Schema:StructType,val SQLContext: SQLContext) extends BaseRelation {
override def sqlContext: SQLContext = sqlContext;
override def schema: StructType = {
if(Schema != null){
Schema
}else{
StructType{
Array(
StructField("id",IntegerType,false),
StructField("name",StringType,false),
StructField("age",IntegerType,false)
)
}
}
}
}
这个是Provider的实现类,类名一定要是DefaultSource才能生效的
package scala.spark.text
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.sources.{BaseRelation, RelationProvider, SchemaRelationProvider}
import org.apache.spark.sql.types.StructType
class DefaultSource extends RelationProvider with SchemaRelationProvider{
override def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = {
createRelation(sqlContext,parameters,null)
}
override def createRelation(sqlContext: SQLContext, parameters: Map[String, String], schema: StructType): BaseRelation = {
new DouRelation(schema,sqlContext)
}
}
然后我们使用读取外部数据源的方法来获取我们这个自定义的外部数据源的方式,这里的format要填入我们定义这个三个类的包名:scala.spark.text,用双引号括起来。
package scala.spark.text
import org.apache.spark.sql.SparkSession
object RelationApp {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local[2]").getOrCreate()
val df = spark.read.format("scala.spark.text").load()
df.printSchema()
spark.close()
}
}
下面是运行结果:
当我们使用printSchema这个方法的时候,就能直接获取到schema了,因为现在这个df没有数据但是确有schema信息了。
实验二
实现数据的输出,这里我们就只实现一个TableScan的方法来获取外部数据源全量的数据,另外几个接口我就不在这里显示了,有兴趣的小伙伴请自行实现吧。
Relation类的实现类
package scala.spark.text
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.sources.{BaseRelation, TableScan}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
class DouRelation(val Schema:StructType,val SQLContext: SQLContext,val path:String) extends BaseRelation with TableScan{
override def sqlContext: SQLContext = SQLContext;
override def schema: StructType = {
if(Schema != null){
Schema
}else{
StructType{
Array(
StructField("id",IntegerType,false),
StructField("name",StringType,false),
StructField("age",IntegerType,false)
)
}
}
}
//这里就是多了一个buildScan的方法来获取目标数据源的数据,获取的是一中RDD,所以我们就直接使用RDD的获取方法
override def buildScan(): RDD[Row] = {
val rdd = SQLContext.sparkContext.textFile(path)
val Rdd = rdd.map(x => x.split(",")).map(x => Row(x(0).toInt,x(1),x(2).toInt))
Rdd
}
}
接着是DefaultSource这个类的类体,这里的写法其实还不算最好的写法,第三个实验我们将会给出标准的写法
package scala.spark.text
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.sources.{BaseRelation, RelationProvider, SchemaRelationProvider}
import org.apache.spark.sql.types.StructType
class DefaultSource extends RelationProvider with SchemaRelationProvider{
override def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = {
createRelation(sqlContext,parameters,null)
}
override def createRelation(sqlContext: SQLContext, parameters: Map[String, String], schema: StructType): BaseRelation = {
val path = parameters.get("path")
//这里是scala最为特殊的地方,就是使用Map的get方法获取key后得到的不是一个简单的value,而已一个Some(value)的对象,这个时候可以再次使用get方法
val p = path.getOrElse(" ")
new DouRelation(schema,sqlContext,p)
}
}
数据读取的过程
package scala.spark.text
import org.apache.spark.sql.SparkSession
object RelationApp {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local[2]").getOrCreate()
val df = spark.read.format("scala.spark.text").load("/Users/*****/*****/person")
df.printSchema()
df.show()
spark.close()
}
}
最后的输出显示:
|
|