当前位置:K88软件开发文章中心编程语言SQLStorm → 文章内容

Storm 使用非 JVM 语言开发

减小字体 增大字体 作者:佚名  来源:网上搜集  发布时间:2019-1-18 8:37:42

本可能会为了等待一个来自 Storm 的输入而永远挂起,而 Storm 却在等待来自你的脚本的输出。因此当你的脚本有内容输出时立即清空缓冲是很重要的。开始循环以及读/写元组这是整个工作中最重要的一步。这一步的实现取决于你开发的 spout 和 bolt。如果是 spout,你应当开始分发元组。如果是 bolt,就循环读取元组,处理它们,分发它发,确认成功或失败。下面我们就看看用来分发数字的 spout。01$from = intval($argv[1]);02$to = intval($argv[2]);03while(true) {04 $msg = read_msg();05 $cmd = json_decode($msg, true);06 if ($cmd['command']=='next') {07 if ($from<$to) {08 storm_emit(array("$from"));09 $task_ids = read_msg();10 $from++;11 } else {12 sleep(1);13 }14 }15 storm_sync();16} 从命令行获取参数 from 和 to,并开始迭代。每次从 Storm 得到一条 next 消息,这意味着你已准备好分发下一个元组。一旦你发送了所有的数字,而且没有更多元组可发了,就休眠一段时间。为了确保脚本已准备好发送下一个元组,Storm 会在发送下一条之前等待 sync\n 文本行。调用 read_msg(),读取一条命令,解析 JSON。对于 bolts 来说,有少许不同。01while(true) {02 $msg = read_msg();03 $tuple = json_decode($msg, true, 512, JSON_BIGINT_AS_STRING);04 if (!empty($tuple["id"])) {05 if (isPrime($tuple["tuple"][0])) {06 storm_emit(array($tuple["tuple"][0]));07 }08 storm_ack($tuple["id"]);09 }10} 循环的从标准输入读取元组。解析读取每一条 JSON 消息,判断它是不是一个元组,如果是,再检查它是不是一个素数,如果是素数再次分发一个元组,否则就忽略掉,最后不论如何都要确认成功。NOTE:在 json_decode 函数中使用的 JSON_BIGINT_AS_STRING 是为了解决一个在JAVA 和 PHP 之间的数据转换问题。JAVA发送的一些很大的数字,在 PHP 中会丢失精度,这样就会导致问题。为了避开这个问题,告诉PHP把大数字当作字符串处理,并在 JSON 消息中输出数字时不使用双引号。PHP5.4.0 或更高版本要求使用这个参数。emit,ack,fail,以及 log 消息都是如下结构:emit{ "command": "emit", "tuple": ["foo", "bar"]} 其中的数组包含了你分发的元组数据。ack{ "command": "ack", "id": 123456789} 其中的 id 就是你处理的元组的 ID。 fail{ "command": "fail", "id": 123456789} 与 ack(译者注:原文是 emit 从上下 JSON 的内容和每个方法的功能上判断此处就是 ack,可能是排版错误)相同,其中 id 就是你处理的元组 ID。 log{ "command": "log", "msg": "some message to be logged by storm."} 下面是完整的的 PHP 代码。001//你的spout:002<?php003function read_msg() {004 $msg = "";005 while(true) {006 $l = fgets(STDIN);007 $line = substr($l,0,-1);008 if ($line=="end") {009 break;010 }011 $msg = "$msg$line\n";012 }013 return substr($msg, 0, -1);014}015function write_line($line) {016 echo("$line\n");017}018function storm_emit($tuple) {019 $msg = array("command" => "emit", "tuple" => $tuple);020 storm_send($msg);021}022function storm_send($json) {023 write_line(json_encode($json));024 write_line("end");025}026function storm_sync() {027 storm_send(array("command" => "sync"));028}029function storm_log($msg) {030 $msg = array("command" => "log", "msg" => $msg);031 storm_send($msg);032 flush();033}034$config = json_decode(read_msg(), true);035$heartbeatdir = $config['pidDir'];036$pid = getmypid();037fclose(fopen("$heartbeatdir/$pid", "w"));038storm_send(["pid"=>$pid]);039flush();040$from = intval($argv[1]);041$to = intval($argv[2]);042while(true) {043 $msg = read_msg();044 $cmd = json_decode($msg, true);045 if ($cmd['command']=='next') {046 if ($from<$to) {047 storm_emit(array("$from"));048 $task_ids = read_msg();049 $from++;050 } else {051 sleep(1);052 }053 }054 storm_sync();055}056?>057//你的bolt:058<?php059function isPrime($number) {060 if ($number < 2) {061 return false;062 }063 if ($number==2) {064 return true;065 }066 for ($i=2; $i<=$number-1; $i++) {067 if ($number % $i == 0) {068 return false;069 }070 }071 return true;072}073function read_msg() {074 $msg = "";075 while(true) {076 $l = fgets(STDIN);077 $line = substr($l,0,-1);078 if ($line=="end") {079 break;080 }081 $msg = "$msg$line\n";082 }083 return substr($msg, 0, -1);084}085function write_line($line) {086 echo("$line\n");087}088function storm_emit($tuple) {089 $msg = array("command" => "emit", "tuple" => $tuple);090 storm_send($msg);091}092function storm_send($json) {093 write_line(json_encode($json));094 write_line("end");095}096function storm_ack($id) {097 storm_send(["command"=>"ack", "id"=>"$id"]);098}099function storm_log($msg) {100 $msg = array("command" => "log", "msg" => "$msg");101 storm_send($msg);102}103$config = json_decode(read_msg(), true);104$heartbeatdir = $config['pidDir'];105$pid = getmypid();106fclose(fopen("$heartbeatdir/$pid", "w"));107storm_send(["pid"=>$pid]);108flush();109while(true) {110 $msg = read_msg();111 $tuple = json_decode($msg, true, 512, JSON_BIGINT_AS_STRING);112 if (!empty($tuple["id"])) {113 if (isPrime($tuple["tuple"][0])) {114 storm_emit(array($tuple["tuple"][0]));115 }116 storm_ack($tuple["id"]);117 }118}119?> NOTE:需要重点指出的是,应当把所有的脚本文件保存在你的工程目录下的一个名为multilang/resources 的子目录中。这个子目录被包含在发送给工人进程的 jar 文件中。如果你不把脚本包含在这个目录中,Storm 就不能运行它们,并抛出一个错误。

上一页  [1] [2] 


Storm 使用非 JVM 语言开发