A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

 找回密码
 加入黑马

QQ登录

只需一步,快速开始

Spark系列——从零学习Scala并发编程
1. 什么是Scala Actor
Scala的Actor类似于Java中的多线程编程。但Scala Actor提供的模型与Java多线程有所不同,它提供了一种消息通讯
的机制来实现线程间的数据共享,从而避免了Java多线程间操作同一个数据所带来的不安全性问题。
Actor之间是一个个独立的实体,他们之间是毫无关联的。但是,他们可以通过消息来通信。一个Actor收到其他
Actor的信息后,它可以根据需要作出各种相应。消息的类型可以是任意的,消息的内容也可以是任意的。
接下来,我们使用一个简单的例子来带你了解Actor , 并在例子后面为您解释其运行的过程。
2. 一个例子入门Actor
首先,Actor是scala自带的API,所以我们不用导入其他的包。接着,我们来实现第一个demo,我们想在主线程中启
动一个新的线程。用Actor开发步骤如下:
1. 创建一个单例对象继承Actor。重写 act() 和 exit()。
2. 在main方法中调用该对象的start()。
结果如下:
从上面的例子中我们可以看出Actor有点类似Java的Runnable,而 act( ) 有点类似于 run( )。同时,启动线程的方法
都是start( )。
3. 一个例子学会Actor间的通讯
object Actor1 extends Actor {
 override def act(): Unit = {
  println("Actor线程:"+Thread.currentThread().getName)
}
 override def exit(): Nothing = {
  println("actor exit")
  super.exit()
}
}
object ActorTest extends App {
 println("主线程:"+Thread.currentThread().getName)
 Actor1.start()
}
主线程:main
Actor线程:ForkJoinPool-1-worker-13
actor exit
北京市昌平区建材城西路金燕龙办公楼一层  电话:400-618-9090
上文中我们通过 start()首先启动了线程,然后线程的执行代码放在 act()中。接着,在启动线程后可能要往线程
里面发送请求(可能请求需要携带请求数据),线程接收到数据后需要进行处理,代码如下:
(1). 发送数据的代码格式是  Actor1 ! xxx 。 ! 表示发送异步消息,该方法没有返回值; !? 表示发送同步消
息,等待返回值; !! 发送异步消息,返回值是 Future[Any]。发送的数据可以是一个简单的字符串,也可以是
一个样例类。
(2). 在act()内部,使用 receive + 匹配模式 接收数据,如果接收的是一个类型的数据,只要一个case语句;但如
果接收的是多个类型的数据,则需要在外层嵌套一个死循环,否则只会接收一条数据。
(3). 在子线程接处理完数据后,如果想给主线程返回处理结果,可以拿到子线程内部的sender对象发送结果;主线
程等到接收数据,如果有数据返回,则读取数据,代码如上第4步。
4. Actor的总结与未来
case class SubmitTask(id: String)
case class ReplyTask(msg: String)
object Actor1 extends Actor {
 override def act(): Unit = {
  //4.死循环,不断监听并接收数据
  while (true) {
   //3.通过匹配模式接收数据
   receive {
    case "study" => println("从零学习Scala并发编程...")
    case SubmitTask(id) => println(s"id is $id")
    case "request" =>{
     //todo 这里写业务逻辑
     //请求完毕,往主线程返回数据
     sender ! ReplyTask("success")
   }
  }
 }
}
 override def exit(): Nothing = {
  println("actor exit")
  super.exit()
}
}
object ActorTest extends App {
 Actor1.start()
 //1.发送一个简单的字符串
 Actor1 ! "study"
 //2.发送一个复杂的类型
 Actor1 ! SubmitTask(UUID.randomUUID().toString)
 //3.发送一个请求并等待一个结果
 val resultFuture: Future[Any] = Actor1 !! "request"
 //4.读取返回数据
 val result = resultFuture.apply().asInstanceOf[ReplyTask].msg
 println(result)
}

Actor是一种基于事件的轻量级线程,在以前的并发模型中,我们需要关注共享的数据结构,而使用Actor则需要关注操作数据的代码结构,因为减少了数据的共享。Actor的主要能力来源于消息传递,而不是采用阻塞调用的处理形式。如果创建直接或间接扩展 Actor的类,要确保对对象的所有调用都通过消息传递进行。
我们现在学的Scala Actor是scala 2.10.x版本及以前版本的Actor。Scala在2.11.x版本中将Akka加入其中,作为其默认的Actor,老版本的Actor已经废弃。

0 个回复

您需要登录后才可以回帖 登录 | 加入黑马