// 读取配置文件信息
val masterUrl = Props.get("master", "local")
val appName = Props.get("appName", "Test7")
val className = Props.get("className", "")
val kafkaBootstrapServers = Props.get("kafka.bootstrap.servers", "localhost:9092")
val subscribe = Props.get("subscribe", "test")
val tmpTable = Props.get("tmpTable", "tmp")
val sparksql = Props.get("sparksql", "select * from tmp")
val spark = SparkSession.builder()
.master(masterUrl)
.appName(appName)
.getOrCreate()
val values = lines.selectExpr("cast(value as string)").as[String]
val res = values.map { value =>
// 将json数据解析成list集合
val list = Tools.parseJson(value, className)
// 将List转成元组
Tools.list2Tuple7(list)
}
res.createOrReplaceTempView(tmpTable)
val result = spark.sql(sparksql)
val query = result.writeStream
.format("console")
.outputMode("append")
.start()
query.awaitTermination()
}
}
Tools:解析json的工具类
package com.test
import com.google.gson.Gson
import scala.collection.mutable
object Tools {
def main(args: Array[String]): Unit = {
val tools = new Tools()
val res = tools.parse("{'name':'caocao','age':'32','sex':'male'}", "com.test.People")
println(res)
}
def parseJson(json: String, className: String): List[String] = {
val tools = new Tools()
tools.parse(json, className)
}
// 将List转成Tuple7元组类,这里仅仅是定义7个字段,可以定义更多字段。(ps:这种处理方式很不雅,一时也没想到好办法)
def list2Tuple7(list: List[String]): (String, String, String, String, String, String, String) = {
val t = list match {
case List(a) => (a, "", "", "", "", "", "")
case List(a, b) => (a, b, "", "", "", "", "")
case List(a, b, c) => (a, b, c, "", "", "", "")
case List(a, b, c, d) => (a, b, c, d, "", "", "")
case List(a, b, c, d, e) => (a, b, c, d, e, "", "")
case List(a, b, c, d, e, f) => (a, b, c, d, e, f, "")
case List(a, b, c, d, e, f, g) => (a, b, c, d, e, f, g)
case _ => ("", "", "", "", "", "", "")
}
t
}
}
class Tools {
// 通过传进来的Bean的全类名,进行反射,解析json,返回一个List()
def parse(json: String, className: String): List[String] = {
val list = mutable.ListBuffer[String]()
val gson = new Gson()
val clazz = Class.forName(className)
val obj = gson.fromJson(json, clazz)
val aClass = obj.getClass
val fields = aClass.getDeclaredFields
fields.foreach { f =>
val fName = f.getName
val m = aClass.getDeclaredMethod(fName)
val value = m.invoke(obj).toString
list.append(value)
}
list.toList
}
}
People类和Student类
case class People(name: String, age: String, sex: String) extends Serializable
case class Student(name: String, age: String, sex: String, idNum: String) extends Serializable
二、配置文件
people.properties
master=local
appName=Test7
className=com.test.People
kafka.bootstrap.servers=localhost:9092
subscribe=test
tmpTable=tmp
sparksql=select _1 as name, _2 as age, _3 as sex from tmp
student.properties
master=local
appName=Test7
className=com.test.Student
kafka.bootstrap.servers=localhost:9092
subscribe=test
tmpTable=tmp
sparksql=select _1 as name, _2 as age, _3 as sex, _4 as idNum from tmp