A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

 找回密码
 加入黑马

QQ登录

只需一步,快速开始

1. 并发编程
  • 在 Java 中,多线程访问共享数据的时候会存在【线程安全】问题
  • Scala 的多线程使用了新的通信机制
    • 通过发送消息来通信,没有了共享数据,从而实现并发编程
  • Scala 使用的是 Akka 框架,Akka 通过 Actor 模式实现高并发
    • Akka 是使用 Scala 语言编写的用法高并发的编程框架
    • Akka 的高并发是由 Actor 与 Actor 之间的通信
  • Akka 模型
    • 消息传递
    • 并发(FIFO)
    • 容错
  • Actor 是由 ActorSystem 来创建的
    • ActorSystem 是单例的
    • Actor 是多例的
    • 一个 JVM 中,只需要有一个 ActorSystem

类型别名
[Scala] 纯文本查看 复制代码
object Alias {
    def main(args: Array[String]): Unit = {
        val arr1: Array[Int] = Array(1, 2, 3, 4, 5)
        type myArr = Array[Int]
        val arr2: myArr = Array(1, 2, 3, 4, 5)
    }
}


Akka 案例
  • 创建 maven 工程,导入依赖
[XML] 纯文本查看 复制代码
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.study</groupId>
    <artifactId>ScalaDemo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <!-- 定义一下常量 -->
    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <encoding>UTF-8</encoding>
        <scala.version>2.11.8</scala.version>
        <scala.compat.version>2.11</scala.compat.version>
        <akka.version>2.4.17</akka.version>
    </properties>

    <dependencies>
        <!-- 添加scala的依赖 -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <!-- 添加akka的actor依赖 -->
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-actor_${scala.compat.version}</artifactId>
            <version>${akka.version}</version>
        </dependency>
        <!-- 多进程之间的Actor通信 -->
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-remote_${scala.compat.version}</artifactId>
            <version>${akka.version}</version>
        </dependency>
    </dependencies>

    <!-- 指定插件-->
    <build>
        <plugins>
            <!-- 指定编译scala的插件 -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                            <args>
                                <arg>-dependencyfile</arg>
                                <arg>${project.build.directory}/.scala_dependencies</arg>
                            </args>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

            <!-- maven打包的插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>reference.conf</resource>
                                </transformer>
                                <!-- 指定maven方法 -->
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass></mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>


  • HelloActor入门案例
[Scala] 纯文本查看 复制代码
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import com.typesafe.config.{Config, ConfigFactory}

class HelloAcrot extends Actor {
    //type Receive = scala.PartialFunction[scala.Any, scala.Unit]
    override def receive: Receive = {
        case msg: String => println(msg)
    }
}
object HelloAcrot {
    val actorSystemName = "helloActorSystem"
    val actorName = "helloActor"

    def main(args: Array[String]): Unit = {
        val host = "127.0.0.1"
        val port = 8888
        // 指定配置 host + port
        val prot =
            s"""
             |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
             |akka.remote.netty.tcp.hostname = ${host}
             |akka.remote.netty.tcp.port = ${port}
            """
            .stripMargin
        // 获取一个配置实例
        val config: Config = ConfigFactory.parseString(prot)
        // 创建一个ActorSystem
        val actorSystem: ActorSystem = ActorSystem.create(actorSystemName,config)
        val actor: ActorRef = actorSystem.actorOf(Props(new HelloAcrot()),actorName)
        actor ! "Hello Dog!"
    }
}

  • Actor 互相通信

[Scala] 纯文本查看 复制代码
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import com.typesafe.config.{Config, ConfigFactory}

class Ai extends Actor {
  override def receive: Receive = {
    case str: String => {
      println(s"Ai获取到消息${str}")
      sender() ! "Ai回复你"
    }
  }
}

object Ai {
  val actorSystemName = "Ai_system"
  val actorName = "Ai_actor"

  def main(args: Array[String]): Unit = {
    val host = "127.0.0.1"
    val port = 9999
    // 指定协议
    val str =
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname = ${host}
         |akka.remote.netty.tcp.port = ${port}
        """
        .stripMargin
    val config: Config = ConfigFactory.parseString(str)
    val actorSystem: ActorSystem = ActorSystem.create(actorSystemName, config)
    actorSystem.actorOf(Props(new Ai()), actorName)
  }
}

[Scala] 纯文本查看 复制代码
import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
import com.typesafe.config.{Config, ConfigFactory}

class Horse extends Actor {
  override def receive: Receive = {
    case str:String => {
      println(s"获取到Ai的回复:${str}")
      sender() ! "Ai,你好"
    }
  }

  override def preStart(): Unit = {
    val path = s"akka.tcp://${Ai.actorSystemName}@127.0.0.1:9999/user/${Ai.actorName}"
    val actorProxy: ActorSelection = context.actorSelection(path)
    println(s"获取${Horse.actorName}代理对象成功!")
    actorProxy ! "Ai你好"
  }
}

object Horse {
  val actorSystemName = "Horse_system"
  val actorName = "Horse_actor"

  def main(args: Array[String]): Unit = {
    val host = "127.0.0.1"
    val port = 8888
    // 指定协议
    val str =
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname = ${host}
         |akka.remote.netty.tcp.port = ${port}
        """
        .stripMargin
    // 获取 ActorSystem 配置
    val config: Config = ConfigFactory.parseString(str)
    // 创建一个 ActorSystem 实例
    val actorSystem: ActorSystem = ActorSystem.create(actorSystemName, config)
    actorSystem.actorOf(Props(new Horse()), actorName)
  }
}


Spark 通信机制
  • HDFS: NameNode + DataNode
  • Spark: Master + Workder
Spark RPC 通信的基本机制
  • Master 先启动,然后 Worker 向 Master 建立连接
  • Worker 向 Master 注册
    • 资源(cores,memory)
    • workerId
    • 用 case class 封装相关的信息
  • Master 接收 Worker 的消息后,保存 workerId
    • 集合 Map[workerId,Info类]
  • Master 返回注册成功消息
    • case object
  • workerId接收到注册成功消息后,定时发送心跳信息
    • 定时任务 case class(workerId)
  • Master 定时任务,在 Master 启动之后立即启动
    • 清楚超时没发心跳的 Worder

[Scala] 纯文本查看 复制代码
object RpcMessage {
}

"worker向master注册的样例类"
case class Worker2Master(workerId: String)

"master 返回给 worker 注册成功消息"
case object Registered

"worker定时向master发送的心跳信息"
case class HeartBeat(workerId: String)

"master 启动定时,定时检测超时的master"
case object CheckWorkerStataus


[Scala] 纯文本查看 复制代码
object RpcMessage {
}

"worker向master注册的样例类"
case class Worker2Master(workerId: String)

"master 返回给 worker 注册成功消息"
case object Registered

"worker定时向master发送的心跳信息"
case class HeartBeat(workerId: String)

"master 启动定时,定时检测超时的master"
case object CheckWorkerStataus


[Scala] 纯文本查看 复制代码
import java.util.UUID
import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
import com.typesafe.config.{Config, ConfigFactory}
class Worker(val masterHost: String, val masterPort: String) extends Actor {
  "workerId"
  val workerId: String = UUID.randomUUID().toString.replaceAll("-", "")

  override def receive: Receive = {
    case Registered => {
      "worker 接收到 master 返回的注册成功消息"
      println(s"worder(${workerId})获取到master的响应,注册成功!")
      "worker 定时向 master 发送心跳"
      import scala.concurrent.duration._ "导入时间单位"
      import context.dispatcher 
      context.system.scheduler.schedule(0 second, 10 second, sender(), HeartBeat(workerId))
    }
  }

  override def preStart(): Unit = {
    val path = s"akka.tcp://${Master.actorSystemName}@${masterHost}:${masterPort}/user/${Master.actorName}"
    val masterProxy: ActorSelection = context.actorSelection(path)
    println(s"worker(${workerId})创建,成功获取 master 代理")
    masterProxy ! Worker2Master(workerId)
  }
}

object Worker {
  val actorSystemName = "Worker_System"
  val actorName = "Worker_Actor"

  def main(args: Array[String]): Unit = {
    "参数校验"
    if (args.length != 4) {
      println("Usage:com.study.sparkrpc.Worker<master host><master port><host><port>")
      sys.exit(1)
    }
    val Array(masterHost, masterPort, host, port) = args
    val str =
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname = ${host}
         |akka.remote.netty.tcp.port = ${port}
        """
        .stripMargin
    val config: Config = ConfigFactory.parseString(str)
    val actorSystem: ActorSystem = ActorSystem.create(actorSystemName, config)
    actorSystem.actorOf(Props(new Worker(masterHost, masterPort)), actorName)
  }
}


[Scala] 纯文本查看 复制代码
import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.{Config, ConfigFactory}

import scala.collection.mutable

class Master extends Actor {
  "存储所有的workerInfo"
  val workerMap = new mutable.HashMap[String, WorkerInfo]()

  override def preStart(): Unit = {
    import scala.concurrent.duration._
    import context.dispatcher
    context.system.scheduler.schedule(0 second, 13 second, self, CheckWorkerStataus)
  }

  override def receive: Receive = {
    case Worker2Master(workerId) => {
      println(s"master 接收到 worker(${workerId}) 的注册")
      val info = new WorkerInfo(workerId)
      info.lastHeartBeatTime = System.currentTimeMillis()
      workerMap.put(workerId, info)
      sender() ! Registered
    }
    case HeartBeat(workerId) => {
      workerMap(workerId).lastHeartBeatTime = System.currentTimeMillis()
      println(s"master 接收到 worker(${workerId})的心跳")
    }
    case CheckWorkerStataus => {
      println("master 检查过期的 worker")
      val deadWorks: mutable.HashMap[String, WorkerInfo] = workerMap.filter(tp => System.currentTimeMillis() - tp._2.lastHeartBeatTime > 30 * 1000)
      deadWorks.foreach(tp => workerMap.remove(tp._1))
      "workerMap --= deadWorks.keys"
      println(s"master 的大小为:${workerMap.size}")
    }
  }
}

object Master {
  val actorSystemName = "Master_System"
  val actorName = "Master_Actor"

  def main(args: Array[String]): Unit = {
    "参数校验"
    if (args.length != 2) {
      println("Usage:com.study.sparkrpc.Master<masterHost><masterPort>")
      sys.exit(1)
    }
    val Array(host, port) = args
    val str =
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname = ${host}
         |akka.remote.netty.tcp.port = ${port}
        """
        .stripMargin
    val config: Config = ConfigFactory.parseString(str)
    val actorSystem: ActorSystem = ActorSystem.create(actorSystemName, config)
    actorSystem.actorOf(Props(new Master()), actorName)
  }
}





0 个回复

您需要登录后才可以回帖 登录 | 加入黑马