当前位置:K88软件开发文章中心编程语言SQLStorm → 文章内容

Storm 一个实际的例子

减小字体 增大字体 作者:佚名  来源:网上搜集  发布时间:2019-1-18 8:37:41

spout 调用 collector.emit(new Values(user, entry)) 分发包含这些信息的元组。这个元组的内容是拓扑里下一个 bolt 的输入。GetCategoryBolt这个 bolt 非常简单。它只负责反序列化前面的 spout 分发的元组内容。如果这是产品页的数据,就通过 ProductsReader 类从 Redis 读取产品信息,然后基于输入的元组再分发一个新的包含具体产品信息的元组:用户产品产品类别 package storm.analytics;public class GetCategoryBolt extends BaseBasicBolt { private ProductReader reader; ... @Override public void execute(Tuple input, BasicOutputCollector collector) { NavigationEntry entry = (NavigationEntry)input.getValue(1); if("PRODUCT".equals(entry.getPageType())){ try { String product = (String)entry.getOtherData().get("product"); //调用产品条目API,得到产品信息 Product itm = reader.readItem(product); if(itm == null) { return; } String categ = itm.getCategory(); collector.emit(new Values(entry.getUserId(), product, categ)); } catch (Exception ex) { System.err.println("Error processing PRODUCT tuple"+ ex); ex.printStackTrace(); } } } ...} 正如前面所提到的, 使用 ProductsReader 类读取产品具体信息。package storm.analytics.utilities;...public class ProductReader { ... public Product readItem(String id) throws Exception{ String content = jedis.get(id); if(content == null || ("nil".equals(content))){ return null; } Object obj = JSONValue.parse(content); JSONObjectproduct = (JSONObject)obj; Product i = new Product((Long)product.get("id"), (String)product.get("title"), (Long)product.get("price"), (String)product.get("category")); return i; } ...} UserHistoryBoltUserHistoryBolt 是整个应用的核心。它负责持续追踪每个用户浏览过的产品,并决定应当增加计数的键值对。我们使用 Redis 保存用户的产品浏览历史,同时基于性能方面的考虑,还应该保留一份本地副本。我们把数据访问细节隐藏在方法 getUserNavigationHistory(user) 和addProductToHistory(user,prodKey) 里,分别用来读/写访问。它们的实现如下package storm.analytics;...public class UserHistoryBolt extends BaseRichBolt{ @Override public void execute(Tuple input) { String user = input.getString(0); String prod1 = input.getString(1); String cat1 = input.getString(2); //产品键嵌入了产品类别信息 String prodKey = prod1+":"+cat1; Set productsNavigated = getUserNavigationHistory(user); //如果用户以前浏览过->忽略它 if(!productsNavigated.contains(prodKey)) { //否则更新相关条目 for (String other : productsNavigated) { String[] ot = other.split(":"); String prod2 = ot[0]; String cat2 = ot[1]; collector.emit(new Values(prod1, cat2)); collector.emit(new Values(prod2, cat1)); } addProductToHistory(user, prodKey); } }} 需要注意的是,这个 bolt 的输出是那些类别计数应当获得增长的产品。看一看代码。这个 bolt 维护着一组被每个用户浏览过的产品。值得注意的是,这个集合包含产品:类别键值对,而不是只有产品。这是因为你会在接下来的调用中用到类别信息,而且这样也比每次从数据库获取更高效。这样做的原因是基于以下考虑,产品可能只有一个类别,而且它在整个产品的生命周期当中不会改变。读取了用户以前浏览过的产品集合之后(以及它们的类别),检查当前产品以前有没有被浏览过。如果浏览过,这条浏览数据就被忽略了。如果这是首次浏览,遍历用户浏览历史,并执行collector.emit(new Values(prod1,cat2)) 分发一个元组,这个元组包含当前产品和所有浏览历史类别。第二个元组包含所有浏览历史产品和当前产品类别,由 collectior.emit(new Values(prod2,cat1))。最后,将当前产品和它的类别添加到集合。比如,假设用户 John 有以下浏览历史:下面是将要处理的浏览数据该用户没有浏览过产品8,因此你需要处理它。因此要分发以下元组:注意,左边的产品和右边的类别之间的关系应当作为一个整体递增。现在,让我们看看这个 Bolt 用到的持久化实现。public class UserHistoryBolt extends BaseRichBolt{ ... private Set getUserNavigationHistory(String user) { Set userHistory = usersNavigatedItems.get(user); if(userHistory == null) { userHistory = jedis.smembers(buildKey(user)); if(userHistory == null) userHistory = new HashSet(); usersNavigatedItems.put(user, userHistory); } return userHistory; } private void addProductToHistory(String user, String product) { Set userHistory = getUserNavigationHistory(user); userHistory.add(product); jedis.sadd(buildKey(user), product); } ...} getUserNavigationHistory 方法返回用户浏览过的产品集。首先,通过usersNavigatedItems.get(user) 方法试图从本地内存得到用户浏览历史,否则,使用jedis.smembers(buildKey(user)) 从 Redis 服务器获取,并把数据添加到本地数据结构usersNavigatedItems。当用户浏览一个新产品时,调用 addProductToHistory,通过 userHi

上一页  [1] [2] [3] [4]  下一页


Storm 一个实际的例子