当前位置:K88软件开发文章中心编程语言SQLStorm → 文章内容

Storm 起步

减小字体 增大字体 作者:佚名  来源:网上搜集  发布时间:2019-1-18 8:37:37

─ java | ├── spouts | └── bolts └── resources java 目录下的子目录包含我们的代码,我们把要统计单词数的文件保存在 resource 目录下。NOTE:命令 mkdir -p 会创建所有需要的父目录。创建我们的第一个 Topology我们将为运行单词计数创建所有必要的类。可能这个例子中的某些部分,现在无法讲的很清楚,不过我们会在随后的章节做进一步的讲解。Spoutpout WordReader 类实现了 IRichSpout 接口。我们将在第四章看到更多细节。WordReader负责从文件按行读取文本,并把文本行提供给第一个 bolt。NOTE: 一个 spout 发布一个定义域列表。这个架构允许你使用不同的 bolts 从同一个spout 流读取数据,它们的输出也可作为其它 bolts 的定义域,以此类推。例2-1包含 WordRead 类的完整代码(我们将会分析下述代码的每一部分)。 / 例2-1.src/main/java/spouts/WordReader.java / package spouts; import java.io.BufferedReader; import java.io.FileNotFoundException; import java.io.FileReader; import java.util.Map; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichSpout; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; public class WordReader implements IRichSpout { private SpoutOutputCollector collector; private FileReader fileReader; private boolean completed = false; private TopologyContext context; public boolean isDistributed() {return false;} public void ack(Object msgId) { System.out.println("OK:"+msgId); } public void close() {} public void fail(Object msgId) { System.out.println("FAIL:"+msgId); } / 这个方法做的惟一一件事情就是分发文件中的文本行 / public void nextTuple() { / 这个方法会不断的被调用,直到整个文件都读完了,我们将等待并返回。 / if(completed){ try { Thread.sleep(1000); } catch (InterruptedException e) { //什么也不做 } return; } String str; //创建reader BufferedReader reader = new BufferedReader(fileReader); try{ //读所有文本行 while((str = reader.readLine()) != null){ / 按行发布一个新值 / this.collector.emit(new Values(str),str); } }catch(Exception e){ throw new RuntimeException("Error reading tuple",e); }finally{ completed = true; } } / 我们将创建一个文件并维持一个collector对象 / public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { try { this.context = context; this.fileReader = new FileReader(conf.get("wordsFile").toString()); } catch (FileNotFoundException e) { throw new RuntimeException("Error reading file ["+conf.get("wordFile")+"]"); } this.collector = collector; } / 声明输入域"word" / public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("line")); } } 第一个被调用的 spout 方法都是 public void open(Map conf, TopologyContext context, SpoutOutputCollector collector)。它接收如下参数:配置对象,在定义topology 对象是创建;TopologyContext 对象,包含所有拓扑数据;还有SpoutOutputCollector 对象,它能让我们发布交给 bolts 处理的数据。下面的代码主是这个方法的实现。 public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { try { this.context = context; this.fileReader = new FileReader(conf.get("wordsFile").toString()); } catch (FileNotFoundException e) { throw new RuntimeException("Error reading file ["+conf.get("wordFile")+"]"); } this.collector = collector; } 我们在这个方法里创建了一个 FileReader 对象,用来读取文件。接下来我们要实现 public void nextTuple(),我们要通过它向 bolts 发布待处理的数据。在这个例子里,这个方法要读取文件并逐行发布数据。 public void nextTuple() { if(completed){ try { Thread.sleep(1); } catch (InterruptedException e) { //什么也不做 } return; } String str; BufferedReader reader = new BufferedReader(fileReader); try{ while((str = reader.readLine()) != null){

上一页  [1] [2] [3] [4]  下一页


Storm 起步