A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

 找回密码
 加入黑马

QQ登录

只需一步,快速开始

Structured Streaming写入Mysql

Structured Streaming在Write的过程,并没有提供jdbc的写入format格式。所以需要自己通过foreach自己实现。具体实现代码如下:

StructuredWriteMysql类
package com.test

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession


object StructuredWriteMysql {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)

    val spark = SparkSession.builder()
      .master("local")
      .appName("Test")
      .getOrCreate()

    val lines = spark.readStream
      .format("socket")
      .option("host", "localhost")
      .option("port", 9998)
      .load()

    lines.createOrReplaceTempView("tmp1")

    val lines2 = spark.sql("select split(value,',') as a from tmp1")

    lines2.createOrReplaceTempView("tmp2")

    val result = spark.sql("select a[0] as name, a[1] as age, a[2] as sex,a[3] as uuid from tmp2")

    val mysqlSink = new MysqlSink("jdbc:mysql://localhost:3306/test", "root", "root")

    val query = result.writeStream
      .outputMode("append")
      .foreach(mysqlSink)
      .start()

    query.awaitTermination()
  }
}

open(),process(),close()会依次执行。

自定义MysqlSink类
package com.test

import java.sql.{Connection, DriverManager}

import org.apache.spark.sql.{ForeachWriter, Row}

class MysqlSink(url: String, user: String, pwd: String) extends ForeachWriter[Row] {

  var conn: Connection = _

  override def open(partitionId: Long, epochId: Long): Boolean = {

    Class.forName("com.mysql.jdbc.Driver")
    conn = DriverManager.getConnection(url, user, pwd)
    true
  }

  override def process(value: Row): Unit = {
    val p = conn.prepareStatement("replace into people(name,age,sex,uuid) values(?,?,?,?)")
    p.setString(1, value(0).toString)
    p.setString(2, value(1).toString)
    p.setString(3, value(2).toString)
    p.setString(4, value(3).toString)
    p.execute()
  }

  override def close(errorOrNull: Throwable): Unit = {
    conn.close()
  }
}

直接运行,在nc -lk 9998中输入下面数据

caocao,32,male,1001
liubei,30,male,1002
guanyu,28,male,1003

查询mysql:



具体见博主原博客,项目参见github

---------------------
【转载,仅作分享,侵删】
作者:张行之
原文:https://blog.csdn.net/qq_33689414/article/details/86305739
版权声明:本文为博主原创文章,转载请附上博文链接!

1 个回复

倒序浏览
奈斯,感谢分享
回复 使用道具 举报
您需要登录后才可以回帖 登录 | 加入黑马