当前位置:K88软件开发文章中心大数据Apache Storm → 文章内容

Apache Storm Trident

减小字体 增大字体 作者:佚名  来源:网上搜集  发布时间:2019-1-26 10:05:59

combine(Long val1, Long val2) { return val1 + val2; } @Override public Long zero() { return 0L; }}分组分组操作是一个内置操作,可以由groupBy方法调用。groupBy方法通过在指定字段上执行partitionBy来重新分区流,然后在每个分区中,它将组字段相等的元组组合在一起。通常,我们使用“groupBy”以及“persistentAggregate”来获得分组聚合。示例代码如下 - TridentTopology topology = new TridentTopology();// persistentAggregate - saving the count to memorytopology.newStream("spout", spout) .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”)) .groupBy(new Fields(“d”) .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));合并和连接合并和连接可以分别通过使用“合并”和“连接”方法来完成。合并组合一个或多个流。加入类似于合并,除了加入使用来自两边的trident tuple字段来检查和连接两个流的事实。此外,加入将只在批量级别工作。示例代码如下 - TridentTopology topology = new TridentTopology();topology.merge(stream1, stream2, stream3);topology.join(stream1, new Fields("key"), stream2, new Fields("x"), new Fields("key", "a", "b", "c"));状态维护Trident提供了状态维护的机制。状态信息可以存储在拓扑本身中,否则也可以将其存储在单独的数据库中。原因是维护一个状态,如果任何元组在处理过程中失败,则重试失败的元组。这会在更新状态时产生问题,因为您不确定此元组的状态是否已在之前更新过。如果在更新状态之前元组已经失败,则重试该元组将使状态稳定。然而,如果元组在更新状态后失败,则重试相同的元组将再次增加数据库中的计数并使状态不稳定。需要执行以下步骤以确保消息仅处理一次 - 小批量处理元组。为每个批次分配唯一的ID。如果重试批次,则给予相同的唯一ID。状态更新在批次之间排序。例如,第二批次的状态更新将不可能,直到第一批次的状态更新完成为止。分布式RPC 分布式RPC用于查询和检索Trident拓扑结果。 Storm有一个内置的分布式RPC服务器。分布式RPC服务器从客户端接收RPC请求并将其传递到拓扑。拓扑处理请求并将结果发送到分布式RPC服务器,分布式RPC服务器将其重定向到客户端。Trident的分布式RPC查询像正常的RPC查询一样执行,除了这些查询并行运行的事实。什么时候使用Trident?在许多使用情况下,如果要求是只处理一次查询,我们可以通过在Trident中编写拓扑来实现。另一方面,在Storm的情况下将难以实现精确的一次处理。因此,Trident将对那些需要一次处理的用例有用。Trident不适用于所有用例,特别是高性能用例,因为它增加了Storm的复杂性并管理状态。Trident的工作实例我们将把上一节中制定的呼叫日志分析器应用程序转换为Trident框架。由于其高级API,Trident应用程序将比普通风暴更容易。Storm基本上需要执行Trident中的Function,Filter,Aggregate,GroupBy,Join和Merge操作中的任何一个。最后,我们将使用LocalDRPC类启动DRPC服务器,并使用LocalDRPC类的execute方法搜索一些关键字。格式化呼叫信息FormatCall类的目的是格式化包括“呼叫者号码”和“接收者号码”的呼叫信息。完整的程序代码如下 - 编码:FormatCall.java import backtype.storm.tuple.Values;import storm.trident.operation.BaseFunction;import storm.trident.operation.TridentCollector;import storm.trident.tuple.TridentTuple;public class FormatCall extends BaseFunction { @Override public void execute(TridentTuple tuple, TridentCollector collector) { String fromMobileNumber = tuple.getString(0); String toMobileNumber = tuple.getString(1); collector.emit(new Values(fromMobileNumber + " - " + toMobileNumber)); }} CSVSplit CSVSplit类的目的是基于“comma(,)”拆分输入字符串,并发出字符串中的每个字。此函数用于解析分布式查询的输入参数。完整的代码如下 - 编码:CSVSplit.java import backtype.storm.tuple.Values;import storm.trident.operation.BaseFunction;import storm.trident.operation.TridentCollector;import storm.trident.tuple.TridentTuple;public class CSVSplit extends BaseFunction { @Override public void execute(TridentTuple tuple, TridentCollector collector) { for(String word: tuple.getString(0).split(",")) { if(word.length() > 0) { collector.emit(new Values(word)); } } }}日志分析器这是主要的应用程序。最初,应用程序将使用FeederBatchSpout初始化TridentTopology并提供调用者信息。Trident拓扑流可以使用TridentTopology类的newStream方法创建。类似地,Trident拓扑DRPC流可以使用TridentTopology类的newDRCPStream方法创建。可以使用LocalDRPC类创建一个简单的DRCP服务器。LocalDRPC有execute方法来搜索一些关键字。完整的代码如下。编码:LogAnalyserTrident.java import java.util.*;import backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.LocalDRPC;import backtype.storm.utils.DRPCClient;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import storm.trident.TridentState;import storm.trident.TridentTopology;import storm.trident.tuple.TridentTuple;import storm.trident.operation.builtin.FilterNull;import storm.trident.operation.builtin.Count;import storm.trident.operation.builtin.Sum;import storm.trident.operation.builtin.MapGet;import storm.trident.operation.builtin.Debug;import storm.trident.operation.BaseFilter;import storm.trident.testing.FixedBatchSpout;import storm.trident.testing.FeederBatchSpout;import storm.trident.testing.Split;import storm.trident.testing.MemoryMapState;import com.google.common.collect.ImmutableList;public class LogAnalyserTrident { public static voi

上一页  [1] [2] [3]  下一页


Apache Storm Trident