A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

 找回密码
 加入黑马

QQ登录

只需一步,快速开始

© 不二晨 金牌黑马   /  2019-3-8 09:19  /  710 人查看  /  1 人回复  /   0 人收藏 转载请遵从CC协议 禁止商业使用本文

1.Flink读取Socket流,实现Word Count示例

import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.windowing.time.Time

object SocketWindowWordCount {

  def main(args: Array[String]): Unit = {

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val res: DataStream[String] = env.socketTextStream("localhost", 9998, '\n')

    val wordCounts = res
      .flatMap { w => w.split(",") }
      .map(w => WordWithCount(w, 1))
      .keyBy("word")
      .timeWindow(Time.seconds(5), Time.seconds(1))
      .sum("count")

    wordCounts.print()

    env.execute("SocketWindowWordCount")
  }
}

2.Flink读取Text文件,实现Word Count示例

import org.apache.flink.core.fs.FileSystem
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}

object TextWindowWordCount {
  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 设置并行度
    env.setParallelism(2)

    val values = env.readTextFile("/Users/zhangzhiqiang/Documents/test_project/flinkdemo/data")

    values.print()

    val res = values.flatMap(_.split(","))
      .filter(_.nonEmpty)
      .map((_, 1))
      .keyBy(0)
      .sum(1)

    res.writeAsCsv("/Users/zhangzhiqiang/Documents/test_project/flinkdemo/data2", FileSystem.WriteMode.NO_OVERWRITE)

    env.execute()
  }
}

3.Flink读取csv文件


import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.streaming.api.scala._


object ReadCsvFile {

  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    // 直接将数据,转成Student(相当于Schema)
    val values = env.readCsvFile[Student]("/Users/zhangzhiqiang/Documents/test_project/flinkdemo/data/1.csv")
    values.print()
  }
}

Student类

case class Student(name: String, age: Int, sex: String, id: String)
1
4.Flink读取csv文件,并使用Table sql转换

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.api.scala._

object ReadTableCsvFile {

  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment

    val tableEnv = TableEnvironment.getTableEnvironment(env)

    val input = env.readCsvFile[Student]("/Users/zhangzhiqiang/Documents/test_project/flinkdemo/data/1.csv")

    input.print()

    tableEnv.registerDataSet("student", input)

    val result = tableEnv.sqlQuery("select * from student")

    result.printSchema()

    result.toDataSet[Student].print()
  }
}

或者做些转换,如:

import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._

object ReadTableCsvFile2 {

  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment

    // 获取table env对象
    val tableEnv = TableEnvironment.getTableEnvironment(env)

    // 读取数据
    val input = env.readCsvFile[Student]("/Users/zhangzhiqiang/Documents/test_project/flinkdemo/data/1.csv")

    input.print()

    // 将DataSet转成Table对象
    val table = tableEnv.fromDataSet(input)

    // 注册 Table
    tableEnv.registerTable("student", table)

    // sql 查询语句
    val result = tableEnv.sqlQuery("select name,age,sex from student")

    result.printSchema()

    // 将数据转化输出
    result.toDataSet[People].print()
  }
}

People类

case class People(name: String, age: Int, sex: String)
1
5.Flink读取Kafka流,实现Word Count

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010

object KafkaWordCountStreaming {

  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
    properties.setProperty("group.id", "test")
    val stream = env.addSource(new FlinkKafkaConsumer010[String]("test", new SimpleStringSchema(), properties))

    stream.print()

    val result = stream.flatMap(x => x.split(","))
      .map(x => (x, 1)).keyBy(0)
      .timeWindow(Time.seconds(10))
      .sum(1)

    result.print()

    env.execute("KafkaWordCountStreaming")
  }
}

6.Flink读取Kafka流,转换后输出

import java.util.Properties

import com.google.gson.Gson
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010

object KafkaJsonStreaming {

  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val p = new Properties()
    p.setProperty("bootstrap.servers", "localhost:9092")
    p.setProperty("group.id", "test")

    val stream = env.addSource(new FlinkKafkaConsumer010[String]("test", new SimpleStringSchema(), p))

    stream.print()

    val result = stream.map { x =>
      val g = new Gson()
      val people = g.fromJson(x, classOf[People])
      people
    }

    result.print()

    env.execute("KafkaJsonStreaming")
  }
}

7.Flink读取Kafka流,并写入Kafka

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer010, FlinkKafkaProducer010}
import org.apache.flink.streaming.api.scala._

object KafkaToKafkaStreaming {

  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val p = new Properties()
    p.setProperty("bootstrap.servers", "localhost:9092")
    p.setProperty("group.id", "test")

    val input = env.addSource(new FlinkKafkaConsumer010[String]("test", new SimpleStringSchema(), p))

    input.print()

    val p2 = new Properties()
    p2.setProperty("bootstrap.servers", "localhost:9092")
    p2.setProperty("zookeeper.connect", "localhost:2181")
    p2.setProperty("group.id", "test")
    input.addSink(new FlinkKafkaProducer010[String]("test", new SimpleStringSchema(), p2))

    env.execute("KafkaToKafkaStreaming")
  }
}

具体见博主原博客,项目参见github
---------------------
【转载,仅作分享,侵删】
作者:张行之
原文:https://blog.csdn.net/qq_33689414/article/details/86626746
版权声明:本文为博主原创文章,转载请附上博文链接!

1 个回复

倒序浏览
奈斯,感谢分享
回复 使用道具 举报
您需要登录后才可以回帖 登录 | 加入黑马