当前位置:K88软件开发文章中心编程语言SQLStorm → 文章内容

Storm 一个实际的例子

减小字体 增大字体 作者:佚名  来源:网上搜集  发布时间:2019-1-18 8:37:41

story.add(product) 和 jedis.sadd(buildKey(user),product) 同时更新内存数据结构和 Redis 服务器。需要注意的是,当你需要做并行化处理时,只要 bolt 在内存中维护着用户数据,你就得首先通过用户做域数据流分组(译者注:原文是 fieldsGrouping,详细情况请见第三章的域数据流组),这是一件很重要的事情,否则集群内将会有用户浏览历史的多个不同步的副本。ProductCategoriesCounterBolt该类持续追踪所有的产品-类别关系。它通过由 UsersHistoryBolt 分发的产品-类别数据对更新计数。每个数据对的出现次数保存在 Redis 服务器。基于性能方面的考虑,要使用一个本地读写缓存,通过一个后台线程向 Redis 发送数据。该Bolt会向拓扑的下一个 Bolt ——NewsNotifierBolt——发送包含最新记数的元组,这也是最后一个 Bolt,它会向最终用户广播实时更新的数据。public class ProductCategoriesCounterBolt extends BaseRichBolt { ... @Override public void execute(){ String product = input.getString(0); String categ = input.getString(1); int total = count(product, categ); collector.emit(new Values(product, categ, total)); } ... private int count(String product, String categ) { int count = getProductCategoryCount(categ, product); count++; storeProductCategoryCount(categ, product, count); return count; } ...} 这个 bolt 的持久化工作隐藏在 getProductCategoryCount 和storeProductCategoryCount 两个方法中。它们的具体实现如下:package storm.analytics;...public class ProductCategoriesCounterBolt extends BaseRichBolt { // 条目:分类 -> 计数 HashMap<String,Integer> counter = new HashMap<String, Integer>(); //条目:分类 -> 计数 HashMap<String,Integer> pendingToSave = new HashMap<String,Integer>(); ... public int getProductCategoryCount(String categ, String product) { Integer count = counter.get(buildLocalKey(categ, product)); if(count == null) { String sCount = jedis.hget(buildRedisKey(product), categ); if(sCount == null || "nil".equals(sCount)) { count = 0; } else { count = Integer.valueOf(sCount); } } return count; } ... private void storeProductCategoryCount(String categ, String product, int count) { String key = buildLocalKey(categ, product); counter.put(key, count); synchronized (pendingToSave) { pendingToSave.put(key, count); } } ...} 方法 getProductCategoryCount 首先检查内存缓存计数器。如果没有有效令牌,就从 Redis 服务器取得数据。方法 storeProductCategoryCount 更新计数器缓存和 pendingToSae 缓冲。缓冲数据由下述后台线程持久化。package storm.analytics;public class ProductCategoriesCounterBolt extends BaseRichBolt {... private void startDownloaderThread() { TimerTask t = startDownloaderThread() { @Override public void run () { HashMap<String, Integer> pendings; synchronized (pendingToSave) { pendings = pendingToSave; pendingToSave = new HashMap<String,Integer>(); } for (String key : pendings.keySet) { String[] keys = key.split(":"); String product = keys[0]; String categ = keys[1]; Integer count = pendings.get(key); jedis.hset(buildRedisKey(product), categ, count.toString()); } } }; timer = new Timer("Item categories downloader"); timer.scheduleAtFixedRate(t, downloadTime, downloadTime); } ...} 下载线程锁定 pendingToSave, 向 Redis 发送数据时会为其它线程创建一个新的缓冲。这段代码每隔 downloadTime 毫秒运行一次,这个值可由拓扑配置参数 download-time 配置。download-time 值越大,写入 Redis 的次数就越少,因为一对数据的连续计数只会向 Redis写一次。 NewsNotifierBolt为了让用户能够实时查看统计结果,由 NewsNotifierBolt 负责向web应用通知统计结果的变化。通知机制由 Apache HttpClient 通过 HTTP POST 访问由拓扑配置参数指定的 URL。POST 消息体是 JSON 格式。测试时把这个 bolt 从拜年中删除。01package storm.analytics;02...03public class NewsNotifierBolt extends BaseRichBolt {04...05@Override06public void execute(Tuple input) {07String product = input.getString(0);08String categ = input.getString(1);09int visits = input.getInteger(2);</code>1011String content = "{\"product\":\"+product+"\",\"categ\":\""+categ+"\",\"visits\":"+visits+"}";12HttpPost post = new HttpPost(webserver);13try {14post.setEntity(new StringEntity(content));15HttpResponse response = client.execute(post);16org.apache.http.util.EntityUtils.consume(response.getEntity());17} catch (Exception e) {18e.printStackTrace();19reconnect();20}21}22...23} Redis 服务器Redis 是一种先进的、基于内存的、支持持久化的键值存储(见http://redis.io)。本例使用它存储以下信息:产品信息,用来为 web 站点服务 用户浏览队列,用来为 Storm 拓扑提供数据Storm 拓扑的中间数据,用于拓扑发生故障时恢复数据Storm 拓扑的处理结果,也就是我们期望得到的结果。 产品信息Redis 服务器以产品 ID 作为键,以 JSON 字符串作为值保存着产品信息。1redis-cli2redis 127.0.0.1:6379> get 153"{\"title\":\"Kids smartphone cov

上一页  [1] [2] [3] [4]  下一页


Storm 一个实际的例子