1. 并发编程- 在 Java 中,多线程访问共享数据的时候会存在【线程安全】问题
- Scala 的多线程使用了新的通信机制
- 通过发送消息来通信,没有了共享数据,从而实现并发编程
- Scala 使用的是 Akka 框架,Akka 通过 Actor 模式实现高并发
- Akka 是使用 Scala 语言编写的用法高并发的编程框架
- Akka 的高并发是由 Actor 与 Actor 之间的通信
- Akka 模型
- Actor 是由 ActorSystem 来创建的
- ActorSystem 是单例的
- Actor 是多例的
- 一个 JVM 中,只需要有一个 ActorSystem
类型别名
[Scala] 纯文本查看 复制代码 object Alias {
def main(args: Array[String]): Unit = {
val arr1: Array[Int] = Array(1, 2, 3, 4, 5)
type myArr = Array[Int]
val arr2: myArr = Array(1, 2, 3, 4, 5)
}
}
Akka 案例[XML] 纯文本查看 复制代码 <?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.study</groupId>
<artifactId>ScalaDemo</artifactId>
<version>1.0-SNAPSHOT</version>
<!-- 定义一下常量 -->
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<encoding>UTF-8</encoding>
<scala.version>2.11.8</scala.version>
<scala.compat.version>2.11</scala.compat.version>
<akka.version>2.4.17</akka.version>
</properties>
<dependencies>
<!-- 添加scala的依赖 -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!-- 添加akka的actor依赖 -->
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_${scala.compat.version}</artifactId>
<version>${akka.version}</version>
</dependency>
<!-- 多进程之间的Actor通信 -->
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-remote_${scala.compat.version}</artifactId>
<version>${akka.version}</version>
</dependency>
</dependencies>
<!-- 指定插件-->
<build>
<plugins>
<!-- 指定编译scala的插件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
<!-- maven打包的插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
<!-- 指定maven方法 -->
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass></mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
[Scala] 纯文本查看 复制代码 import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import com.typesafe.config.{Config, ConfigFactory}
class HelloAcrot extends Actor {
//type Receive = scala.PartialFunction[scala.Any, scala.Unit]
override def receive: Receive = {
case msg: String => println(msg)
}
}
object HelloAcrot {
val actorSystemName = "helloActorSystem"
val actorName = "helloActor"
def main(args: Array[String]): Unit = {
val host = "127.0.0.1"
val port = 8888
// 指定配置 host + port
val prot =
s"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname = ${host}
|akka.remote.netty.tcp.port = ${port}
"""
.stripMargin
// 获取一个配置实例
val config: Config = ConfigFactory.parseString(prot)
// 创建一个ActorSystem
val actorSystem: ActorSystem = ActorSystem.create(actorSystemName,config)
val actor: ActorRef = actorSystem.actorOf(Props(new HelloAcrot()),actorName)
actor ! "Hello Dog!"
}
}
[Scala] 纯文本查看 复制代码 import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import com.typesafe.config.{Config, ConfigFactory}
class Ai extends Actor {
override def receive: Receive = {
case str: String => {
println(s"Ai获取到消息${str}")
sender() ! "Ai回复你"
}
}
}
object Ai {
val actorSystemName = "Ai_system"
val actorName = "Ai_actor"
def main(args: Array[String]): Unit = {
val host = "127.0.0.1"
val port = 9999
// 指定协议
val str =
s"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname = ${host}
|akka.remote.netty.tcp.port = ${port}
"""
.stripMargin
val config: Config = ConfigFactory.parseString(str)
val actorSystem: ActorSystem = ActorSystem.create(actorSystemName, config)
actorSystem.actorOf(Props(new Ai()), actorName)
}
}
[Scala] 纯文本查看 复制代码 import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
import com.typesafe.config.{Config, ConfigFactory}
class Horse extends Actor {
override def receive: Receive = {
case str:String => {
println(s"获取到Ai的回复:${str}")
sender() ! "Ai,你好"
}
}
override def preStart(): Unit = {
val path = s"akka.tcp://${Ai.actorSystemName}@127.0.0.1:9999/user/${Ai.actorName}"
val actorProxy: ActorSelection = context.actorSelection(path)
println(s"获取${Horse.actorName}代理对象成功!")
actorProxy ! "Ai你好"
}
}
object Horse {
val actorSystemName = "Horse_system"
val actorName = "Horse_actor"
def main(args: Array[String]): Unit = {
val host = "127.0.0.1"
val port = 8888
// 指定协议
val str =
s"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname = ${host}
|akka.remote.netty.tcp.port = ${port}
"""
.stripMargin
// 获取 ActorSystem 配置
val config: Config = ConfigFactory.parseString(str)
// 创建一个 ActorSystem 实例
val actorSystem: ActorSystem = ActorSystem.create(actorSystemName, config)
actorSystem.actorOf(Props(new Horse()), actorName)
}
}
Spark 通信机制- HDFS: NameNode + DataNode
- Spark: Master + Workder
Spark RPC 通信的基本机制- Master 先启动,然后 Worker 向 Master 建立连接
- Worker 向 Master 注册
- 资源(cores,memory)
- workerId
- 用 case class 封装相关的信息
- Master 接收 Worker 的消息后,保存 workerId
- Master 返回注册成功消息
- workerId接收到注册成功消息后,定时发送心跳信息
- 定时任务 case class(workerId)
- Master 定时任务,在 Master 启动之后立即启动
[Scala] 纯文本查看 复制代码 object RpcMessage {
}
"worker向master注册的样例类"
case class Worker2Master(workerId: String)
"master 返回给 worker 注册成功消息"
case object Registered
"worker定时向master发送的心跳信息"
case class HeartBeat(workerId: String)
"master 启动定时,定时检测超时的master"
case object CheckWorkerStataus
[Scala] 纯文本查看 复制代码 object RpcMessage {
}
"worker向master注册的样例类"
case class Worker2Master(workerId: String)
"master 返回给 worker 注册成功消息"
case object Registered
"worker定时向master发送的心跳信息"
case class HeartBeat(workerId: String)
"master 启动定时,定时检测超时的master"
case object CheckWorkerStataus
[Scala] 纯文本查看 复制代码 import java.util.UUID
import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
import com.typesafe.config.{Config, ConfigFactory}
class Worker(val masterHost: String, val masterPort: String) extends Actor {
"workerId"
val workerId: String = UUID.randomUUID().toString.replaceAll("-", "")
override def receive: Receive = {
case Registered => {
"worker 接收到 master 返回的注册成功消息"
println(s"worder(${workerId})获取到master的响应,注册成功!")
"worker 定时向 master 发送心跳"
import scala.concurrent.duration._ "导入时间单位"
import context.dispatcher
context.system.scheduler.schedule(0 second, 10 second, sender(), HeartBeat(workerId))
}
}
override def preStart(): Unit = {
val path = s"akka.tcp://${Master.actorSystemName}@${masterHost}:${masterPort}/user/${Master.actorName}"
val masterProxy: ActorSelection = context.actorSelection(path)
println(s"worker(${workerId})创建,成功获取 master 代理")
masterProxy ! Worker2Master(workerId)
}
}
object Worker {
val actorSystemName = "Worker_System"
val actorName = "Worker_Actor"
def main(args: Array[String]): Unit = {
"参数校验"
if (args.length != 4) {
println("Usage:com.study.sparkrpc.Worker<master host><master port><host><port>")
sys.exit(1)
}
val Array(masterHost, masterPort, host, port) = args
val str =
s"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname = ${host}
|akka.remote.netty.tcp.port = ${port}
"""
.stripMargin
val config: Config = ConfigFactory.parseString(str)
val actorSystem: ActorSystem = ActorSystem.create(actorSystemName, config)
actorSystem.actorOf(Props(new Worker(masterHost, masterPort)), actorName)
}
}
[Scala] 纯文本查看 复制代码 import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.{Config, ConfigFactory}
import scala.collection.mutable
class Master extends Actor {
"存储所有的workerInfo"
val workerMap = new mutable.HashMap[String, WorkerInfo]()
override def preStart(): Unit = {
import scala.concurrent.duration._
import context.dispatcher
context.system.scheduler.schedule(0 second, 13 second, self, CheckWorkerStataus)
}
override def receive: Receive = {
case Worker2Master(workerId) => {
println(s"master 接收到 worker(${workerId}) 的注册")
val info = new WorkerInfo(workerId)
info.lastHeartBeatTime = System.currentTimeMillis()
workerMap.put(workerId, info)
sender() ! Registered
}
case HeartBeat(workerId) => {
workerMap(workerId).lastHeartBeatTime = System.currentTimeMillis()
println(s"master 接收到 worker(${workerId})的心跳")
}
case CheckWorkerStataus => {
println("master 检查过期的 worker")
val deadWorks: mutable.HashMap[String, WorkerInfo] = workerMap.filter(tp => System.currentTimeMillis() - tp._2.lastHeartBeatTime > 30 * 1000)
deadWorks.foreach(tp => workerMap.remove(tp._1))
"workerMap --= deadWorks.keys"
println(s"master 的大小为:${workerMap.size}")
}
}
}
object Master {
val actorSystemName = "Master_System"
val actorName = "Master_Actor"
def main(args: Array[String]): Unit = {
"参数校验"
if (args.length != 2) {
println("Usage:com.study.sparkrpc.Master<masterHost><masterPort>")
sys.exit(1)
}
val Array(host, port) = args
val str =
s"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname = ${host}
|akka.remote.netty.tcp.port = ${port}
"""
.stripMargin
val config: Config = ConfigFactory.parseString(str)
val actorSystem: ActorSystem = ActorSystem.create(actorSystemName, config)
actorSystem.actorOf(Props(new Master()), actorName)
}
}
|