当前位置:K88软件开发文章中心编程语言SQLStorm → 文章内容

Storm 一个实际的例子

减小字体 增大字体 作者:佚名  来源:网上搜集  发布时间:2019-1-18 8:37:41

er\",\"category\":\"Covers\",\"price\":30,\"id\":415}" 用户浏览队列用户浏览队列保存在 Redis 中一个键为 navigation 的先进先出队列中。用户浏览一个产品页时,服务器从队列左侧添加用户浏览数据。Storm 集群不断的从队列右侧获取并移除数据。01redis 127.0.0.1:6379> llen navigation02(integer) 503redis 127.0.0.1:6379> lrange navigation 0 4041) "{\"user\":\"59c34159-0ecb-4ef3-a56b-99150346f8d5\",\"product\":\"1\",\"type\":05\"PRODUCT\"}"062) "{\"user\":\"59c34159-0ecb-4ef3-a56b-99150346f8d5\",\"product\":\"1\",\"type\":07\"PRODUCT\"}"083) "{\"user\":\"59c34159-0ecb-4ef3-a56b-99150346f8d5\",\"product\":\"2\",\"type\":09\"PRODUCT\"}"104) "{\"user\":\"59c34159-0ecb-4ef3-a56b-99150346f8d5\",\"product\":\"3\",\"type\":11\"PRODUCT\"}"125) "{\"user\":\"59c34159-0ecb-4ef3-a56b-99150346f8d5\",\"product\":\"5\",\"type\":13\"PRODUCT\"}" 中间数据集群需要分开保存每个用户的历史数据。为了实现这一点,它在 Redis 服务器上保存着一个包含所有用户浏览过的产品和它们的分类的集合。1redis 127.0.0.1:6379> smembers history:59c34159-0ecb-4ef3-a56b-99150346f8d521) "1:Players"32) "5:Cameras"43) "2:Players"54) "3:Cameras" 结果Storm 集群生成关于用户浏览的有用数据,并把它们的产品 ID 保存在一个名为 “prodcnt” 的Redis hash 中。1redis 127.0.0.1:6379> hgetall prodcnt:221) "Players"32) "1"43) "Cameras"54) "2" 测试拓扑使用 LocalCluster 和一个本地 Redis 服务器执行测试。向 Redis 填充产品数据,伪造访问日志。我们的断言会在读取拓扑向 Redis 输出的数据时执行。测试用户用 java 和 groovy 完成。 测试架构初始化测试初始化由以下三步组成: 启动 LocalCluster 并提交拓扑。初始化在 AbstractAnalyticsTest 实现,所有测试用例都继承该类。当初始化多个 AbstractAnalyticsTest 子类对象时,由一个名为topologyStarted 的静态标志属性确定初始化工作只会进行一次。需要注意的是,sleep 语句是为了确保在试图获取结果之前 LocalCluster 已经正确启动了。01public abstract class AbstractAnalyticsTest extends Assert {02 def jedis03 static topologyStarted = false04 static sync= new Object()05 private void reconnect() {06 jedis = new Jedis(TopologyStarter.REDIS_HOST, TopologyStarter.REDIS_PORT)07 }08 @Before09 public void startTopology(){10 synchronized(sync){11 reconnect()12 if(!topologyStarted){13 jedis.flushAll()14 populateProducts()15 TopologyStarter.testing = true16 TopologyStarter.main(null)17 topologyStarted = true18 sleep 100019 }20 }21 }22 ...23 public void populateProducts() {24 def testProducts = [25 [id: 0, title:"Dvd player with surround sound system",26 category:"Players", price: 100],27 [id: 1, title:"Full HD Bluray and DVD player",28 category:"Players", price:130],29 [id: 2, title:"Media player with USB 2.0 input",30 category:"Players", price:70],31 ...32 [id: 21, title:"TV Wall mount bracket 50-55 Inches",33 category:"Mounts", price:80]34 ]35 testProducts.each() { product ->36 def val =37 "{ \"title\": \"${product.title}\" , \"category\": \"${product.category}\"," +38 " \"price\": ${product.price}, \"id\": ${product.id} }"39 println val40 jedis.set(product.id.toString(), val.toString())41 }42 }43 ...44} 在 AbstractAnalyticsTest 中实现一个名为 navigate 的方法。为了测试不同的场景,我们要模拟用户浏览站点的行为,这一步向 Redis 的浏览队列(译者注:就是前文提到的键是navigation 的队列)插入浏览数据。01public abstract class AbstractAnalyticsTest extends Assert {02 ...03public void navigate(user, product) {04 String nav =05 "{\"user\": \"${user}\", \"product\": \"${product}\", \"type\": \"PRODUCT\"}".toString()06 println "Pushing navigation: ${nav}"07 jedis.lpush('navigation', nav)08 }09 ...10} 实现一个名为 getProductCategoryStats 的方法,用来读取指定产品与分类的数据。不同的测试同样需要断言统计结果,以便检查拓扑是否按照期望的那样执行了。01public abstract class AbstractAnalyticsTest extends Assert {02 ...03 public int getProductCategoryStats(String product, String categ) {04 String count = jedis.hget("prodcnt:${product}", categ)05 if(count == null || "nil".equals(count))06 return 007 return Integer.valueOf(count)08 }09 ...10} 一个测试用例下一步,为用户“1”模拟一些浏览记录,并检查结果。注意执行断言之前要给系统留出两秒钟处理数据。(记住 ProductCategoryCounterBolt 维护着一份计数的本地副本,它是在后台异步保存到 Redis 的。)01package functional02class StatsTest extends AbstractAnalyticsTest {03 @Test04 public void testNoDuplication(){05 navigate("1", "0") // Players06 navigate("1", "1") // Players07 navigate("1", "2") // Players08 navigate("1", "3") // Cameras09 Thread.sleep(2000) // Give two seconds for the system to process the data.10 assertEquals 1, getProductCategoryStats("0", "Cameras")11 assertEquals 1, getProductCategoryStats("1", "Cameras")12 assertEquals 1, getProductCategoryStats("2", "Cameras")13 assertEquals 2, getProductCategoryStats("0", "Players")14 assertEquals 3, getProductCategoryStats("3", "Players")15 }16} 对可扩展性和可用性的提示为了能在一章的篇幅中讲明白整个方案,它已经被简化了。正因如此,一些与可扩展性和可用性有关的必要复杂性也被去掉了。这方面主要有两个问题。 Redis 服务器不只是一个故障的节点,还是性能瓶颈。你能接收的数据最多就是 Redis 能处理的那些。Redis 可以通过分片增强扩展性,它的可用性可以通过主从配置得到改进。这都需要修改拓扑和 web 应用的代码实现。 另一个缺点就是 web 应用不能通过增加服务器成比例的扩展。这是因为当产品统计数据发生变化时,需要通知所有关注它的浏览器。这一“通知浏览器”的机制通过 Socket.io 实现,但是它要求监听器和通知器在同一主机上。这一点只有当 GET /product/:id/stats 和 POST /news 满足以下条件时才能实现,那就是这二者拥有相同的分片标准,确保引用相同产品的请求由相同的服务器处理。

上一页  [1] [2] [3] [4] 


Storm 一个实际的例子