当前位置:K88软件开发文章中心编程语言SQLscala → 文章内容

Searchbird

减小字体 增大字体 作者:佚名  来源:网上搜集  发布时间:2019-1-19 4:45:26

everse = new mutable.HashMap[String, Set[String]] with mutable.SynchronizedMap[String, Set[String]] def get(key: String) = { forward.get(key) match { case None => log.debug("get %s: miss", key) Future.exception(SearchbirdException("No such key")) case Some(value) => log.debug("get %s: hit", key) Future(value) } } def put(key: String, value: String) = { log.debug("put %s", key) forward(key) = value // admit only one updater. synchronized { (Set() ++ value.split(" ")) foreach { token => val current = reverse.get(token) getOrElse Set() reverse(token) = current + key } } Future.Unit } def search(query: String) = Future.value { val tokens = query.split(" ") val hits = tokens map { token => reverse.getOrElse(token, Set()) } val intersected = hits reduceLeftOption { _ & _ } getOrElse Set() intersected.toList }}现在,我们把 thrift 服务转换成一个简单的调度机制:为每一个索引实例提供一个 thrift 接口。这是一个强大的抽象,因为它分离了索引实现和服务实现。服务不再知道索引的任何细节;索引可以是本地的或远程的,甚至可能是许多索引的组合,但服务并不关心,索引实现可能会更改但是不用修改服务。将 SearchbirdServiceImpl 类定义更换为以下(简单得多)的代码(其中不再包含索引实现细节)。注意初始化服务器现在需要第二个参数 Index 。…/SearchbirdServiceImpl.scalaclass SearchbirdServiceImpl(config: SearchbirdServiceConfig, index: Index) extends SearchbirdService.ThriftServer { val serverName = "Searchbird" val thriftPort = config.thriftPort def get(key: String) = index.get(key) def put(key: String, value: String) = index.put(key, value) flatMap { _ => Future.Unit } def search(query: String) = index.search(query) def shutdown() = { super.shutdown(0.seconds) }}…/config/SearchbirdServiceConfig.scala相应地更新 SearchbirdServiceConfig 的 apply 调用:class SearchbirdServiceConfig extends ServerConfig[SearchbirdService.ThriftServer] { var thriftPort: Int = 9999 var tracerFactory: Tracer.Factory = NullTracer.factory def apply(runtime: RuntimeEnvironment) = new SearchbirdServiceImpl(this, new ResidentIndex)}我们将建立一个简单的分布式系统,一个主节点组织查询其子节点。为了实现这一目标,我们将需要两个新的 Index 类型。一个代表远程索引,另一种是其他多个 Index 实例的组合索引。这样我们的服务就可以实例化多个远程索引的复合索引来构建分布式索引。请注意这两个 Index 类型具有相同的接口,所以服务器不需要知道它们所连接的索引是远程的还是复合的。…/Index.scala在 Index.scala 中定义了 CompositeIndex :class CompositeIndex(indices: Seq[Index]) extends Index { require(!indices.isEmpty) def get(key: String) = { val queries = indices.map { idx => idx.get(key) map { r => Some(r) } handle { case e => None } } Future.collect(queries) flatMap { results => results.find { _.isDefined } map { _.get } match { case Some(v) => Future.value(v) case None => Future.exception(SearchbirdException("No such key")) } } } def put(key: String, value: String) = Future.exception(SearchbirdException("put() not supported by CompositeIndex")) def search(query: String) = { val queries = indices.map { _.search(query) rescue { case _=> Future.value(Nil) } } Future.collect(queries) map { results => (Set() ++ results.flatten) toList } }}组合索引构建在一组相关 Index 实例的基础上。注意它并不关心这些实例实际上是如何实现的。这种组合类型在构建不同查询机制的时候具有极大的灵活性。我们没有定义拆分机制,所以复合索引不支持 put 操作。这些请求被直接交由子节点处理。 get 的实现是查询所有子节点,并提取第一个成功的结果。如果没有成功结果的话,则抛出一个异常。注意因为没有结果是通过抛出一个异常表示的,所以我们 处理Future ,是将任何异常转换成 None 。在实际系统中,我们很可能会为遗漏值填入适当的错误码,而不是使用异常。异常在构建原型时是方便和适宜的,但不能很好地组合。为了把真正的例外和遗漏值区分开,必须要检查异常本身。相反,把这种区别直接嵌入在返回值的类型中是更好的风格。search 像以前一样工作。和提取第一个结果不同,我们把它们组合起来,通过使用 Set 确保其唯一性。RemoteIndex 提供了到远程服务器的一个 Index 接口。class RemoteIndex(hosts: String) extends Index { val transport = ClientBuilder() .name("remoteIndex") .hosts(hosts) .codec(ThriftClientFramedCodec()) .hostConnectionLimit(1) .timeout(500.milliseconds) .build() val client = new SearchbirdService.FinagledClient(transport) def get(key: String) = client.get(key) def put(key: String, value: String) = client.put(key, value) map { _ => () } def search(query: String) = client.search(query) map { _.toList }}这样就使用一些合理的默认值,调用代理,稍微调整类型,就构造出一个 finagle thrift 客户端。全部放在一起现在我们拥有了需要的所有功能。我们需要调整配置,以便能够调用一个给定的节点,不管是主节点亦或是数据分片节点。为了做到这一点,我们将通过创建一个新的配置项来在系统中枚举分片。我们还需要添加 Index 参数到我们的 SearchbirdServiceImpl 实例。然后,我们将使用命令行参数(还记得 Config 是如何做到的吗)在这两种模式中启动服务器。…/config/SearchbirdServiceConfig.scalaclass SearchbirdServiceConfig extends S

上一页  [1] [2] [3] [4] [5]  下一页


Searchbird