当前位置:K88软件开发文章中心编程语言SQLStorm → 文章内容

Storm 使用非 JVM 语言开发

减小字体 增大字体 作者:佚名  来源:网上搜集  发布时间:2019-1-18 8:37:42

由 小路依依 创建, 最后一次修改 2016-08-12 使用非 JVM 语言开发有时候你可能想使用不是基于 JVM 的语言开发一个 Storm 工程,你可能更喜欢使用别的语言或者想使用用某种语言编写的库。Storm 是用 Java 实现的,你看到的所有这本书中的 spout 和 bolt 都是用 java 编写的。那么有可能使用像 Python、Ruby、 或者 JavaScript 这样的语言编写 spout 和 bolt 吗?答案是当然可以!可以使用多语言协议达到这一目的。多语言协议是 Storm 实现的一种特殊的协议,它使用标准输入输出作为 spout 和 bolt 进程间的通讯通道。消息以 JSON 格式或纯文本格式在通道中传递。我们看一个用非 JVM 语言开发 spout 和 bolt 的简单例子。在这个例子中有一个 spout 产生从1到10,000的数字,一个 bolt 过滤素数,二者都用 PHP 实现。NOTE: 在这个例子中,我们使用一个很笨的办法验证素数。有更好当然也更复杂的方法,它们已经超出了这个例子的范围。有一个专门为 Storm 实现的 PHP DSL (译者注:领域特定语言),我们将会在例子中展示我们的实现。首先定义拓扑。1...2TopologyBuilder builder = new TopologyBuilder();3builder.setSpout("numbers-generator", new NumberGeneratorSpout(1, 10000));4builder.setBolt("prime-numbers-filter", new5PrimeNumbersFilterBolt()).shuffleGrouping("numbers-generator");6StormTopology topology = builder.createTopology();7... NOTE:有一种使用非 JVM 语言定义拓扑的方式。既然 Storm 拓扑是 Thrift 架构,而且Nimbus 是一个 Thrift 守护进程,你就可以使用任何你想用的语言创建并提交拓扑。但是这已经超出了本书的范畴了。这里没什么新鲜了。我们看一下 NumbersGeneratorSpout 的实现。01public class NumberGeneratorSpout extends ShellSpout implements IRichSpout {02 public NumberGeneratorSpout(Integer from, Integer to) {03 super("php", "-f", "NumberGeneratorSpout.php", from.toString(), to.toString());04 }05 public void declareOutputFields(OutputFieldsDeclarer declarer) {06 declarer.declare(new Fields("number"));07 }08 public Map<String, Object> getComponentConfiguration() {09 return null;10 }11} 你可能已经注意到了,这个 spout 继承了 ShellSpout。 这是个由 Storm 提供的特殊的类,用来帮助你运行并控制用其它语言编写的 spout。 在这种情况下它告诉 Storm 如何执行你的PHP 脚本。NumberGeneratorSpout 的 PHP 脚本向标准输出分发元组,并从标准输入读取确认或失败信号。在开始实现 NumberGeneratorSpout.php 脚本之前,多观察一下多语言协议是如何工作的。spout 按照传递给构造器的参数从 from 到 to 顺序生成数字。接下来看看 PrimeNumbersFilterBolt。 这个类实现了之前提到的壳。它告诉 Storm 如何执行你的PHP脚本。 Storm 为这一目的提供了一个特殊的叫做 ShellBolt 的类,你惟一要做的事就是指出如何运行脚本以及声明要分发的属性。1public class PrimeNumbersFilterBolt extends ShellBolt implements IRichBolt {2 public PrimeNumbersFilterBolt() {3 super("php", "-f", "PrimeNumbersFilterBolt.php");4 }5 public void declareOutputFields(OutputFieldsDeclarer declarer) {6 declarer.declare(new Fields("number"));7 }8} 在这个构造器中只是告诉 Storm 如何运行PHP脚本。它与下列命令等价。1php -f PrimeNumbersFilterBolt.php PrimeNumbersFilterBolt.php 脚本从标准输入读取元组,处理它们,然后向标准输出分发、确认或失败。在开始这个脚本之前,我们先多了解一些多语言协议的工作方式。发起一次握手开始循环读/写元组 NOTE:有一种特殊的方式可以使用 Storm 的内建日志机制在你的脚本中记录日志,所以你不需要自己实现日志系统。下面我们来看一看上述每一步的细节,以及如何用 PHP 实现它。发起握手为了控制整个流程(开始以及结束它),Storm 需要知道它执行的脚本进程号(PID)。根据多语言协议,你的进程开始时发生的第一件事就是 Storm 要向标准输入(译者注:根据上下文理解,本章提到的标准输入输出都是从非 JVM 语言的角度理解的,这里提到的标准输入也就是 PHP 的标准输入)发送一段 JSON 数据,它包含 Storm 配置、拓扑上下文和一个进程号目录。它看起来就像下面的样子:{ "conf": { "topology.message.timeout.secs": 3, // etc }, "context": { "task->component": { "1": "example-spout", "2": "__acker", "3": "example-bolt" }, "taskid": 3 }, "pidDir": "..."} 脚本进程必须在 pidDir 指定的目录下以自己的进程号为名字创建一个文件,并以 JSON 格式把进程号写到标准输出。{"pid": 1234}举个例子,如果你收到 /tmp/example\n 而你的脚本进程号是123,你应该创建一个名为 /tmp/example/123 的空文件并向标准输出打印文本行 {“pid”: 123}\n(译者注:此处原文只有一个 n,译者猜测应是排版错误)和 end\n。 这样 Storm 就能持续追踪进程号并在它关闭时杀死脚本进程。下面是 PHP 实现:1$config = json_decode(read_msg(), true);2$heartbeatdir = $config['pidDir'];3$pid = getmypid();4fclose(fopen("$heartbeatdir/$pid", "w"));5storm_send(["pid"=>$pid]);6flush(); 你已经实现了一个叫做 read_msg 的函数,用来处理从标准输入读取的消息。按照多语言协议的声明,消息可以是单行或多行 JSON 文本。一条消息以 end\n 结束。01function read_msg() {02 $msg = "";03 while(true) {04 $l = fgets(STDIN);05 $line = substr($l,0,-1);06 if($line=="end") {07 break;08 }09 $msg = "$msg$line\n";10 }11 return substr($msg, 0, -1);12}13function storm_send($json) {14 write_line(json_encode($json));15 write_line("end");16}17function write_line($line) {18 echo("$line\n");19} NOTE:flush() 方法非常重要;有可能字符缓冲只有在积累到一定程度时才会清空。这意味着你的脚

[1] [2]  下一页


Storm 使用非 JVM 语言开发