当前位置:K88软件开发文章中心大数据Apache Storm → 文章内容

Apache Storm工作实例

减小字体 增大字体 作者:佚名  来源:网上搜集  发布时间:2019-1-26 10:05:58

由 chalex 创建,小路依依 最后一次修改 2016-12-12 我们已经经历了Apache Storm的核心技术细节,现在是时候编写一些简单的场景。场景 - 移动呼叫日志分析器移动呼叫及其持续时间将作为对Apache Storm的输入,Storm将处理和分组在相同呼叫者和接收者之间的呼叫及其呼叫总数。Spout创建Spout是用于数据生成的组件。基本上,一个spout将实现一个IRichSpout接口。 “IRichSpout”接口有以下重要方法 - open -为Spout提供执行环境。执行器将运行此方法来初始化喷头。 nextTuple -通过收集器发出生成的数据。 close -当spout将要关闭时调用此方法。 declareOutputFields -声明元组的输出模式。 ack -确认处理了特定元组。 fail -指定不处理和不重新处理特定元组。open open方法的签名如下 - open(Map conf, TopologyContext context, SpoutOutputCollector collector)conf - 为此spout提供storm配置。 context - 提供有关拓扑中的spout位置,其任务ID,输入和输出信息的完整信息。 collector - 使我们能够发出将由bolts处理的元组。 nextTuple nextTuple方法的签名如下 - nextTuple() nextTuple()从与ack()和fail()方法相同的循环中定期调用。它必须释放线程的控制,当没有工作要做,以便其他方法有机会被调用。因此,nextTuple的第一行检查处理是否已完成。如果是这样,它应该休眠至少一毫秒,以减少处理器在返回之前的负载。close close方法的签名如下- close() declareOutputFields declareOutputFields方法的签名如下- declareOutputFields(OutputFieldsDeclarer declarer) declarer -它用于声明输出流id,输出字段等此方法用于指定元组的输出模式。ackack方法的签名如下 - ack(Object msgId)该方法确认已经处理了特定元组。failnextTuple方法的签名如下- ack(Object msgId)此方法通知特定元组尚未完全处理。 Storm将重新处理特定的元组。 FakeCallLogReaderSpout 在我们的场景中,我们需要收集呼叫日志详细信息。呼叫日志的信息包含。主叫号码接收号码持续时间由于我们没有呼叫日志的实时信息,我们将生成假呼叫日志。假信息将使用Random类创建。完整的程序代码如下。 编码 - FakeCallLogReaderSpout.java import java.util.*;//import storm tuple packagesimport backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;//import Spout interface packagesimport backtype.storm.topology.IRichSpout;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;//Create a class FakeLogReaderSpout which implement IRichSpout interface to access functionalitiespublic class FakeCallLogReaderSpout implements IRichSpout { //Create instance for SpoutOutputCollector which passes tuples to bolt. private SpoutOutputCollector collector; private boolean completed = false; //Create instance for TopologyContext which contains topology data. private TopologyContext context; //Create instance for Random class. private Random randomGenerator = new Random(); private Integer idx = 0; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.context = context; this.collector = collector; } @Override public void nextTuple() { if(this.idx <= 1000) { List<String> mobileNumbers = new ArrayList<String>(); mobileNumbers.add("1234123401"); mobileNumbers.add("1234123402"); mobileNumbers.add("1234123403"); mobileNumbers.add("1234123404"); Integer localIdx = 0; while(localIdx++ < 100 && this.idx++ < 1000) { String fromMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4)); String toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4)); while(fromMobileNumber == toMobileNumber) { toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4)); } Integer duration = randomGenerator.nextInt(60); this.collector.emit(new Values(fromMobileNumber, toMobileNumber, duration)); } } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("from", "to", "duration")); } //Override all the interface methods @Override public void close() {} public boolean isDistributed() { return false; } @Override public void activate() {} @Override public void deactivate() {} @Override public void ack(Object msgId) {} @Override public void fail(Object msgId) {} @Override public Map<String, Object> getComponentConfiguration() { return null; }}Bolt创建Bolt是一个使用元组作为输入,处理元组,并产生新的元组作为输出的组件。Bolts将实现IRichBolt接口。在此程序中,使用两个Bolts 类CallLogCreatorBolt和CallLogCounterBolt来执行操作。 IRichBolt接口有以下方法 - prepare -为bolt提供要执行的环境。执行器将运行此方法来初始化spout。 execute -处理单个元组的输入 cleanup -当spout要关闭时调用。 declareOutputFields -声明元组的输出模式。 Prepareprepare方法的签名如下 - prepare(Map conf, TopologyContext context, OutputCollector collector)conf -为此bolt提供Storm配置。 context -提供有关拓扑中的bolt位置,其任务ID,输入和输出信息等的完整信息。 collector -使我们能够发出处理的元组。execute execute方法的签名如下- execute(Tuple tuple)这里的元组是要处理的输入元组。 execute方法一次处理单个元组。元组数据可以通过Tuple类的getValue方法访问。不必立即处理输

[1] [2] [3]  下一页


Apache Storm工作实例