当前位置:K88软件开发文章中心编程语言SQLStorm → 文章内容

Storm Spouts

减小字体 增大字体 作者:佚名  来源:网上搜集  发布时间:2019-1-18 8:37:39

由 小路依依 创建, 最后一次修改 2016-08-12 Spouts你将在本章了解到 spout 作为拓扑入口和它的容错机制相关的最常见的设计策略。可靠的消息 VS 不可靠的消息在设计拓扑结构时,始终在头脑中记着的一件重要事情就是消息的可靠性。当有无法处理的消息时,你就要决定该怎么办,以及作为一个整体的拓扑结构该做些什么。举个例子,在处理银行存款时,不要丢失任何事务报文就是很重要的事情。但是如果你要统计分析数以百万的 tweeter 消息,即使有一条丢失了,仍然可以认为你的结果是准确的。对于 Storm 来说,根据每个拓扑的需要担保消息的可靠性是开发者的责任。这就涉及到消息可靠性和资源消耗之间的权衡。高可靠性的拓扑必须管理丢失的消息,必然消耗更多资源;可靠性较低的拓扑可能会丢失一些消息,占用的资源也相应更少。不论选择什么样的可靠性策略,Storm 都提供了不同的工具来实现它。要在 spout 中管理可靠性,你可以在分发时包含一个元组的消息 ID(collector.emit(new Values(…),tupleId))。在一个元组被正确的处理时调用 ack** 方法,而在失败时调用 fail** 方法。当一个元组被所有的靶 bolt 和锚 bolt 处理过,即可判定元组处理成功(你将在第5章学到更多锚 bolt 知识)。发生下列情况之一时为元组处理失败:提供数据的 spout 调用 collector.fail(tuple)处理时间超过配置的超时时间 让我们来看一个例子。想象你正在处理银行事务,需求如下:如果事务失败了,重新发送消息如果失败了太多次,终结拓扑运行 创建一个 spout 和一个 bolt,spout 随机发送100个事务 ID,有80%的元组不会被 bolt 收到(你可以在例子 ch04-spout 查看完整代码)。实现 spout 时利用 Map 分发事务消息元组,这样就比较容易实现重发消息。public void nextTuple() { if(!toSend.isEmpty()){ for(Map.Entry<Integer, String> transactionEntry : toSend.entrySet()){ Integer transactionId = transactionEntry.getKey(); String transactionMessage = transactionEntry.getValue(); collector.emit(new Values(transactionMessage),transactionId); } toSend.clear(); }} 如果有未发送的消息,得到每条事务消息和它的关联 ID,把它们作为一个元组发送出去,最后清空消息队列。值得一提的是,调用 map 的 clear 是安全的,因为 nextTuple 失败时,只有 ack 方法会修改 map,而它们都运行在一个线程内。维护两个 map 用来跟踪待发送的事务消息和每个事务的失败次数。ack 方法只是简单的把事务从每个列表中删除。public void ack(Object msgId) { messages.remove(msgId); failCounterMessages.remove(msgId);} fail 方法决定应该重新发送一条消息,还是已经失败太多次而放弃它。NOTE:如果你使用全部数据流组,而拓扑里的所有 bolt 都失败了,spout 的 fail 方法才会被调用。public void fail(Object msgId) { Integer transactionId = (Integer) msgId; //检查事务失败次数 Integer failures = transactionFailureCount.get(transactionId) + 1; if(failes >= MAX_FAILS){ //失败数太高了,终止拓扑 throw new RuntimeException("错误, transaction id 【"+ transactionId+"】 已失败太多次了 【"+failures+"】"); } //失败次数没有达到最大数,保存这个数字并重发此消息 transactionFailureCount.put(transactionId, failures); toSend.put(transactionId, messages.get(transactionId)); LOG.info("重发消息【"+msgId+"】");} 首先,检查事务失败次数。如果一个事务失败次数太多,通过抛出 RuntimeException 终止发送此条消息的工人。否则,保存失败次数,并把消息放入待发送队列(toSend),它就会再次调用 nextTuple 时得以重新发送。 NOTE:Storm 节点不维护状态,因此如果你在内存保存信息(就像本例做的那样),而节点又不幸挂了,你就会丢失所有缓存的消息。Storm 是一个快速失败的系统。拓扑会在抛出异常时挂掉,然后再由 Storm 重启,恢复到抛出异常前的状态。获取数据接下来你会了解到一些设计 spout 的技巧,帮助你从多数据源获取数据。直接连接在一个直接连接的架构中,spout 直接与一个消息分发器连接。 图 直接连接的 spout这个架构很容易实现,尤其是在消息分发器是已知设备或已知设备组时。已知设备满足:拓扑从启动时就已知道该设备,并贯穿拓扑的整个生命周期保持不变。未知设备就是在拓扑运行期添加进来的。已知设备组就是从拓扑启动时组内所有设备都是已知的。下面举个例子说明这一点。创建一个 spout 使用 Twitter 流 API 读取 twitter 数据流。spout 把 API 当作消息分发器直接连接。从数据流中得到符合 track 参数的公共 tweets(参考 twitter 开发页面)。完整的例子可以在链接 https://github.com/storm-book/examples-ch04-spouts/找到。spout 从配置对象得到连接参数(track,user,password),并连接到 API(在这个例子中使用 Apache 的 DefaultHttpClient)。它一次读一行数据,并把数据从 JSON 转化成 Java 对象,然后发布它。public void nextTuple() { //创建http客户端 client = new DefaultHttpClient(); client.setCredentialsProvider(credentialProvider); HttpGet get = new HttpGet(STREAMING_API_URL+track); HttpResponse response; try { //执行http访问 response = client.execute(get); StatusLine status = response.getStatusLine(); if(status.getStatusCode() == 200){ InputStream inputStream = response.getEntity().getContent(); BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream)); String in; //逐行读取数据 while((in = reader.readLine())!=null){ try{ //转化并发布消息 Object json = jsonParser.parse(in); collector.emit(new Values(track,json)); }ca

[1] [2]  下一页


Storm Spouts