当前位置:K88软件开发文章中心编程语言SQLscala → 文章内容

Scala 并发编程

减小字体 增大字体 作者:佚名  来源:网上搜集  发布时间:2019-1-19 4:45:34

由 Shaodengdeng 创建,小路依依 最后一次修改 2016-08-12 Scala 并发编程Runnable/CallableRunnable 接口只有一个没有返回值的方法。trait Runnable { def run(): Unit}Callable与之类似,除了它有一个返回值trait Callable[V] { def call(): V}线程Scala 并发是建立在 Java 并发模型基础上的。在 Sun JVM 上,对 IO 密集的任务,我们可以在一台机器运行成千上万个线程。一个线程需要一个 Runnable。你必须调用线程的 start 方法来运行 Runnable。scala> val hello = new Thread(new Runnable { def run() { println("hello world") }})hello: java.lang.Thread = Thread[Thread-3,5,main]scala> hello.starthello world当你看到一个类实现了 Runnable 接口,你就知道它的目的是运行在一个线程中。单线程代码这里有一个可以工作但有问题的代码片断。import java.net.{Socket, ServerSocket}import java.util.concurrent.{Executors, ExecutorService}import java.util.Dateclass NetworkService(port: Int, poolSize: Int) extends Runnable { val serverSocket = new ServerSocket(port) def run() { while (true) { // This will block until a connection comes in. val socket = serverSocket.accept() (new Handler(socket)).run() } }}class Handler(socket: Socket) extends Runnable { def message = (Thread.currentThread.getName() + "\n").getBytes def run() { socket.getOutputStream.write(message) socket.getOutputStream.close() }}(new NetworkService(2020, 2)).run每个请求都会回应当前线程的名称,所以结果始终是 main 。这段代码的主要缺点是在同一时间,只有一个请求可以被相应!你可以把每个请求放入一个线程中处理。只要简单改变(new Handler(socket)).run()为(new Thread(new Handler(socket))).start()但如果你想重用线程或者对线程的行为有其他策略呢?Executors随着 Java 5 的发布,它决定提供一个针对线程的更抽象的接口。你可以通过 Executors 对象的静态方法得到一个 ExecutorService 对象。这些方法为你提供了可以通过各种政策配置的 ExecutorService ,如线程池。下面改写我们之前的阻塞式网络服务器来允许并发请求。import java.net.{Socket, ServerSocket}import java.util.concurrent.{Executors, ExecutorService}import java.util.Dateclass NetworkService(port: Int, poolSize: Int) extends Runnable { val serverSocket = new ServerSocket(port) val pool: ExecutorService = Executors.newFixedThreadPool(poolSize) def run() { try { while (true) { // This will block until a connection comes in. val socket = serverSocket.accept() pool.execute(new Handler(socket)) } } finally { pool.shutdown() } }}class Handler(socket: Socket) extends Runnable { def message = (Thread.currentThread.getName() + "\n").getBytes def run() { socket.getOutputStream.write(message) socket.getOutputStream.close() }}(new NetworkService(2020, 2)).run这里有一个连接脚本展示了内部线程是如何重用的。$ nc localhost 2020pool-1-thread-1$ nc localhost 2020pool-1-thread-2$ nc localhost 2020pool-1-thread-1$ nc localhost 2020pool-1-thread-2FuturesFuture 代表异步计算。你可以把你的计算包装在 Future 中,当你需要计算结果的时候,你只需调用一个阻塞的 get() 方法就可以了。一个 Executor 返回一个 Future 。如果使用 Finagle RPC 系统,你可以使用 Future 实例持有可能尚未到达的结果。一个 FutureTask 是一个 Runnable 实现,就是被设计为由 Executor 运行的val future = new FutureTask[String](new Callable[String]() { def call(): String = { searcher.search(target);}})executor.execute(future)现在我需要结果,所以阻塞直到其完成。val blockingResult = future.get()参考 Scala School 的 Finagle 介绍中大量使用了 Future,包括一些把它们结合起来的不错的方法。以及 Effective Scala 对 [Futures](http://twitter.github.com/effectivescala/#Twitter's standard libraries-Futures) 的意见。线程安全问题class Person(var name: String) { def set(changedName: String) { name = changedName }}这个程序在多线程环境中是不安全的。如果有两个线程有引用到同一个 Person 实例,并调用 set ,你不能预测两个调用结束后 name 的结果。在 Java 内存模型中,允许每个处理器把值缓存在 L1 或 L2 缓存中,所以在不同处理器上运行的两个线程都可以有自己的数据视图。让我们来讨论一些工具,来使线程保持一致的数据视图。三种工具同步互斥锁(Mutex)提供所有权语义。当你进入一个互斥体,你拥有它。同步是 JVM 中使用互斥锁最常见的方式。在这个例子中,我们会同步 Person。在 JVM 中,你可以同步任何不为 null 的实例。class Person(var name: String) { def set(changedName: String) { this.synchronized { name = changedName } }}volatile随着 Java 5 内存模型的变化,volatile 和 synchronized 基本上是相同的,除了 volatile 允许空值。synchronized 允许更细粒度的锁。 而 volatile 则对每次访问同步。class Person(@volatile var name: String) { def set(changedName: String) { name = changedName }}AtomicReference此外,在 Java 5 中还添加了一系列低级别的并发原语。 AtomicReference 类是其中之一import java.util.concurrent.atomic.AtomicReferenceclass Person(val name: AtomicReference[String]) { def set(changedName: String) { name.set(changedName) }}这个成本是什么?AtomicReference 是这两种选择中最昂贵的,因为你必须去通过方法调度(method dispatch)来访问值。volatile 和 synchronized 是建立在 Java 的内置监视器基础上的。如果没有资源争用,监视器的成本很小。由于 synchronized 允许你进行更细粒度的控制权

[1] [2]  下一页


Scala 并发编程