当前位置:K88软件开发文章中心编程语言SQLStorm → 文章内容

Storm 一个实际的例子

减小字体 增大字体 作者:佚名  来源:网上搜集  发布时间:2019-1-18 8:37:41

由 小路依依 创建, 最后一次修改 2016-08-12 一个实际的例子本章要阐述一个典型的网络分析解决方案,而这类问题通常利用 Hadoop 批处理作为解决方案。与 Hadoop 不同的是,基于 Storm 的方案会实时输出结果。我们的这个例子有三个主要组件一个基于 Node.js 的 web 应用,用于测试系统一个 Redis 服务器,用于持久化数据一个 Storm 拓扑,用于分布式实时处理数据 图 架构概览NOTE:你如果想先把这个例子运行起来,请首先阅读附录C基于 Node.js 的 web 应用我们已经伪造了简单的电子商务网站。这个网站只有三个页面:一个主页、一个产品页和一个产品统计页面。这个应用基于 Express 和 Socket.io 两个框架实现了向浏览器推送内容更新。制作这个应用的目的是为了让你体验 Storm 集群功能并看到处理结果,但它不是本书的重点,所以我们不会对它的页面做更详细描述。主页这个页面提供了全部有效产品的链接。它从Redis服务器获取数据并在页面上把它们显示出来。这个页面的URL是http://localhost:3000/。有效产品:DVD 播放器(带环绕立体声系统)全高清蓝光 dvd 播放器媒体播放器(带 USB 2.0 接口)全高清摄像机防水高清摄像机防震防水高清摄像机反射式摄像机双核安卓智能手机(带 64GB SD卡)普通移动电话卫星电话64GB SD 卡32GB SD 卡16GB SD 卡粉红色智能手机壳黑色智能手机壳小山羊皮智能手机壳产品页产品页用来显示指定产品的相关信息,例如,价格、名称、分类。这个页面的URL是:http://localhost:3000/product/:id。 产品页:32英寸液晶电视分类:电视机价格:400相关分类产品统计页这个页面显示通过收集用户浏览站点,用Storm集群计算的统计信息。可以显示为如下概要:浏览这个产品的用户,在那些分类下面浏览了n次产品。该页的URL是:http://localhost:3000/product/:id/stats。浏览了该产品的用户也浏览了以下分类的产品:摄像机播放器手机壳存储卡启动这个 Node.js web 应用首先启动 Redis 服务器,然后执行如下命令启动 web 应用: node webapp/app.js 为了向你演示,这个应用会自动向 Redis 填充一些产品数据作为样本。Storm 拓扑为这个系统搭建 Storm 拓扑的目标是改进产品统计的实时性。产品统计页显示了一个分类计数器列表,用来显示访问了其它同类产品的用户数。这样可以帮助卖家了解他们的用户需求。拓扑接收浏览日志,并更新产品统计结果图 Storm 拓扑的输入与输出我们的 Storm 拓扑有五个组件:一个 spout 向拓扑提供数据,四个 bolt 完成统计任务。UsersNavigationSpout从用户浏览数据队列读取数据发送给拓扑GetCategoryBolt从Redis服务器读取产品信息,向数据流添加产品分类UserHistoryBolt读取用户以前的产品浏览记录,向下一步分发Product:Category键值对,在下一步更新计数器ProductCategoriesCounterBolt追踪用户浏览特定分类下的商品次数NewsNotifierBolt通知web应用立即更新用户界面下图展示了拓扑的工作方式(见图6-6)package storm.analytics;...public class TopologyStarter { public static void main(String[] args) { Logger.getRootLogger().removeAllAppenders(); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("read-feed", new UsersNavigationSpout(),3); builder.setBolt("get-categ", new GetCategoryBolt(),3) .shuffleGrouping("read-feed"); builder.setBolt("user-history", new UserHistoryBolt(),5) .fieldsGrouping("get-categ", new Fields("user")); builder.setBolt("product-categ-counter", new ProductCategoriesCounterBolt(),5) .fieldsGrouping("user-history", new Fields("product")); builder.setBolt("news-notifier", new NewsNotifierBolt(),5) .shuffleGrouping("product-categ-counter"); Config conf = new Config(); conf.setDebug(true); conf.put("redis-host",REDIS_HOST); conf.put("redis-port",REDIS_PORT); conf.put("webserver", WEBSERVER); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("analytics", conf, builder.createTopology()); }} Figure Storm拓扑UsersNavigationSpoutUsersNavigationSpout 负责向拓扑提供浏览数据。每条浏览数据都是一个用户浏览过的产品页的引用。它们都被 web 应用保存在 Redis 服务器。我们一会儿就要看到更多信息。NOTE:下面的代码块就是相关代码。package storm.analytics;public class UsersNavigationSpout extends BaseRichSpout { Jedis jedis; ... @Override public void nextTuple() { String content = jedis.rpop("navigation"); if(content==null || "nil".equals(content)){ try { Thread.sleep(300); } catch (InterruptedException e) {} } else { JSONObject obj=(JSONObject)JSONValue.parse(content); String user = obj.get("user").toString(); String product = obj.get("product").toString(); String type = obj.get("type").toString(); HashMap<String, String> map = new HashMap<String, String>(); map.put("product", product); NavigationEntry entry = new NavigationEntry(user, type, map); collector.emit(new Values(user, entry)); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("user", "otherdata")); }} spout 首先调用 jedis.rpop(“navigation”) 从 Redis 删除并返回 ”navigation” 列表最右边的元素。如果列表已经是空的,就休眠0.3秒,以免使用忙等待循环阻塞服务器。如果得到一条数据(数据是 JSON 格式),就解析它,并创建一个包含该数据的 NavigationEntry POJO:浏览页面的用户用户浏览的页面类型由页面类型决定的额外页面信息。“产品”页的额外信息就是用户浏览的产品 ID。

[1] [2] [3] [4]  下一页


Storm 一个实际的例子