| <properties> <scala.version>2.11.8</scala.version> <hadoop.version>2.7.4</hadoop.version> <spark.version>2.0.2</spark.version> </properties> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.32</version> </dependency> <!-- spark sql 依赖--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.0.2</version> </dependency> </dependencies> |
| //1.创建spark session,并指定appName,需要将任务提交到哪里。 val spark = new SparkSession.Builder().appName("CaseClassSchema").master("local[2]").getOrCreate() //2.获取SparkContext,后面所有的SparkSQL操作都需要该上下文。 val sc: SparkContext = spark.sparkContext |
| //3.获取每一行内容的RDD,并通过schema将RDD转化成DF val lineRdd: RDD[Array[String]] = sc.textFile("hdfs://node01:8020/spark_res/people.txt").map(_.split(", ")) val peopleRdd: RDD[People] = lineRdd.map(x => People(x(0), x(1).toInt)) import spark.implicits._ val peopleDF: DataFrame = peopleRdd.toDF //4.对DF进行操作 peopleDF.printSchema() peopleDF.show() println(peopleDF.head()) println(peopleDF.count()) peopleDF.columns.foreach(println) |
| //DSL peopleDF.select("name","age").show() peopleDF.filter($"age">20).groupBy("name").count().show //SQL peopleDF.createOrReplaceTempView("t_people") spark.sql("select * from t_people order by age desc").show |
| val properties = new Properties() properties.setProperty("user","root") properties.setProperty("password","123456") //重点代码,连接JDBC val ipLocationDF: DataFrame = spark.read.jdbc("jdbc:mysql://localhost:3306/iplocation","iplocation",properties) ipLocationDF.printSchema ipLocationDF.show |
| object SparkSqlToMysql { def main(args: Array[String]): Unit = { val sc: SparkContext = ... //1.通过spark session读取json数据,并返回DataFrame val peopleDF: DataFrame = spark.read.json(args(0)) //2.将DataFrame注册为t_people表,并对该表进行SQL语句操作 peopleDF.createOrReplaceTempView("t_people") val resultDF = spark.sql("select * from t_people") //3.对上个SQL语句的操作结果进行保存 val properties = new Properties() properties.setProperty("user","root") properties.setProperty("password","123456") resultDF.write.jdbc("jdbc:mysql://192.168.52.105:3306/iplocation","spark_save_result",properties) //close sparkcontext sparksession.. } } |
| 欢迎光临 黑马程序员技术交流社区 (http://bbs.itheima.com/) | 黑马程序员IT技术论坛 X3.2 |