当前位置:K88软件开发文章中心编程语言SQLscala → 文章内容

Scala 并发编程

减小字体 增大字体 作者:佚名  来源:网上搜集  发布时间:2019-1-19 4:45:34

,从而会有更少的争夺,所以 synchronized 往往是最好的选择。当你进入同步点,访问 volatile 引用,或去掉 AtomicReferences 引用时, Java 会强制处理器刷新其缓存线从而提供了一致的数据视图。如果我错了,请大家指正。这是一个复杂的课题,我敢肯定要弄清楚这一点需要一个漫长的课堂讨论。Java 5 的其他灵巧的工具正如前面提到的 AtomicReference ,Java 5 带来了许多很棒的工具。CountDownLatchCountDownLatch 是一个简单的多线程互相通信的机制。val doneSignal = new CountDownLatch(2)doAsyncWork(1)doAsyncWork(2)doneSignal.await()println("both workers finished!")先不说别的,这是一个优秀的单元测试。比方说,你正在做一些异步工作,并要确保功能完成。你的函数只需要 倒数计数(countDown) 并在测试中 等待(await) 就可以了。AtomicInteger/Long由于对 Int 和 Long 递增是一个经常用到的任务,所以增加了 AtomicInteger 和 AtomicLong 。AtomicBoolean我可能不需要解释这是什么。ReadWriteLocks读写锁(ReadWriteLock) 使你拥有了读线程和写线程的锁控制。当写线程获取锁的时候读线程只能等待。让我们构建一个不安全的搜索引擎下面是一个简单的倒排索引,它不是线程安全的。我们的倒排索引按名字映射到一个给定的用户。这里的代码天真地假设只有单个线程来访问。注意使用了 mutable.HashMap 替代了默认的构造函数 this()import scala.collection.mutablecase class User(name: String, id: Int)class InvertedIndex(val userMap: mutable.Map[String, User]) { def this() = this(new mutable.HashMap[String, User]) def tokenizeName(name: String): Seq[String] = { name.split(" ").map(_.toLowerCase) } def add(term: String, user: User) { userMap += term -> user } def add(user: User) { tokenizeName(user.name).foreach { term => add(term, user) } }}这里没有写如何从索引中获取用户。稍后我们会补充。让我们把它变为线程安全在上面的倒排索引例子中,userMap 不能保证是线程安全的。多个客户端可以同时尝试添加项目,并有可能出现前面 Person 例子中的视图错误。由于 userMap 不是线程安全的,那我们怎样保持在同一个时间只有一个线程能改变它呢?你可能会考虑在做添加操作时锁定 userMap。def add(user: User) { userMap.synchronized { tokenizeName(user.name).foreach { term => add(term, user) } }}不幸的是,这个粒度太粗了。一定要试图在互斥锁以外做尽可能多的耗时的工作。还记得我说过如果不存在资源争夺,锁开销就会很小吗。如果在锁代码块里面做的工作越少,争夺就会越少。def add(user: User) { // tokenizeName was measured to be the most expensive operation. val tokens = tokenizeName(user.name) tokens.foreach { term => userMap.synchronized { add(term, user) } }}SynchronizedMap我们可以通过 SynchronizedMap 特质将同步混入一个可变的 HashMap。我们可以扩展现有的 InvertedIndex,提供给用户一个简单的方式来构建同步索引。import scala.collection.mutable.SynchronizedMapclass SynchronizedInvertedIndex(userMap: mutable.Map[String, User]) extends InvertedIndex(userMap) { def this() = this(new mutable.HashMap[String, User] with SynchronizedMap[String, User])}如果你看一下其实现,你就会意识到,它只是在每个方法上加同步锁来保证其安全性,所以它很可能没有你希望的性能。Java ConcurrentHashMapJava 有一个很好的线程安全的 ConcurrentHashMap。值得庆幸的是,我们可以通过 JavaConverters 获得不错的 Scala 语义。事实上,我们可以通过扩展老的不安全的代码,来无缝地接入新的线程安全 InvertedIndex。import java.util.concurrent.ConcurrentHashMapimport scala.collection.JavaConverters._class ConcurrentInvertedIndex(userMap: collection.mutable.ConcurrentMap[String, User]) extends InvertedIndex(userMap) { def this() = this(new ConcurrentHashMap[String, User] asScala)}让我们加载 InvertedIndex原始方式trait UserMaker { def makeUser(line: String) = line.split(",") match { case Array(name, userid) => User(name, userid.trim().toInt) }}class FileRecordProducer(path: String) extends UserMaker { def run() { Source.fromFile(path, "utf-8").getLines.foreach { line => index.add(makeUser(line)) } }}对于文件中的每一行,我们可以调用 makeUser 然后 add 到 InvertedIndex中。如果我们使用并发 InvertedIndex,我们可以并行调用 add 因为 makeUser 没有副作用,所以我们的代码已经是线程安全的了。我们不能并行读取文件,但我们可以并行构造用户并且把它添加到索引中。一个解决方案:生产者/消费者异步计算的一个常见模式是把消费者和生产者分开,让他们只能通过队列(Queue) 沟通。让我们看看如何将这个模式应用在我们的搜索引擎索引中。import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue}// Concrete producerclass Producer[T](path: String, queue: BlockingQueue[T]) extends Runnable { def run() { Source.fromFile(path, "utf-8").getLines.foreach { line => queue.put(line) } }}// Abstract consumerabstract class Consumer[T](queue: BlockingQueue[T]) extends Runnable { def run() { while (true) { val item = queue.take() consume(item) } } def consume(x: T)}val queue = new LinkedBlockingQueue[String]()// One thread for the producerval producer = new Producer[String]("users.txt", q)new Thread(producer).start()trait UserMaker { def makeUser(line: String) = line.split(",") match { case Array(name, userid) => User(name, userid.trim().toInt) }}class IndexerConsumer(index: InvertedIndex, queue: BlockingQueue[String]) extends Consumer[String](queue) with UserMaker { def consume(t: String) = index.add(makeUser(t))}// Let's pretend we have 8 cores on this machine.val cores = 8val pool = Executors.newFixedThreadPool(cores)// Submit one consumer per core.for (i <- i to cores) { pool.submit(new IndexerConsumer[String](index, q))}

上一页  [1] [2] 


Scala 并发编程