当前位置:K88软件开发文章中心大数据Apache Storm → 文章内容

Apache Storm Trident

减小字体 增大字体 作者:佚名  来源:网上搜集  发布时间:2019-1-26 10:05:59

d main(String[] args) throws Exception { System.out.println("Log Analyser Trident"); TridentTopology topology = new TridentTopology(); FeederBatchSpout testSpout = new FeederBatchSpout(ImmutableList.of("fromMobileNumber", "toMobileNumber", "duration")); TridentState callCounts = topology .newStream("fixed-batch-spout", testSpout) .each(new Fields("fromMobileNumber", "toMobileNumber"), new FormatCall(), new Fields("call")) .groupBy(new Fields("call")) .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count")); LocalDRPC drpc = new LocalDRPC(); topology.newDRPCStream("call_count", drpc) .stateQuery(callCounts, new Fields("args"), new MapGet(), new Fields("count")); topology.newDRPCStream("multiple_call_count", drpc) .each(new Fields("args"), new CSVSplit(), new Fields("call")) .groupBy(new Fields("call")) .stateQuery(callCounts, new Fields("call"), new MapGet(), new Fields("count")) .each(new Fields("call", "count"), new Debug()) .each(new Fields("count"), new FilterNull()) .aggregate(new Fields("count"), new Sum(), new Fields("sum")); Config conf = new Config(); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("trident", conf, topology.build()); Random randomGenerator = new Random(); int idx = 0; while(idx < 10) { testSpout.feed(ImmutableList.of(new Values("1234123401", "1234123402", randomGenerator.nextInt(60)))); testSpout.feed(ImmutableList.of(new Values("1234123401", "1234123403", randomGenerator.nextInt(60)))); testSpout.feed(ImmutableList.of(new Values("1234123401", "1234123404", randomGenerator.nextInt(60)))); testSpout.feed(ImmutableList.of(new Values("1234123402", "1234123403", randomGenerator.nextInt(60)))); idx = idx + 1; } System.out.println("DRPC : Query starts"); System.out.println(drpc.execute("call_count","1234123401 - 1234123402")); System.out.println(drpc.execute("multiple_call_count", "1234123401 - 1234123402,1234123401 - 1234123403")); System.out.println("DRPC : Query ends"); cluster.shutdown(); drpc.shutdown(); // DRPCClient client = new DRPCClient("drpc.server.location", 3772); }}构建和运行应用程序完整的应用程序有三个Java代码。他们如下 - FormatCall.java CSVSplit.java LogAnalyerTrident.java 可以使用以下命令构建应用程序 - javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java可以使用以下命令运行应用程序 - java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserTrident输出一旦应用程序启动,应用程序将输出有关集群启动过程,操作处理,DRPC服务器和客户端信息的完整详细信息,以及最后的集群关闭过程。此输出将显示在控制台上,如下所示。DRPC : Query starts[["1234123401 - 1234123402",10]]DEBUG: [1234123401 - 1234123402, 10]DEBUG: [1234123401 - 1234123403, 10][[20]]DRPC : Query ends

上一页  [1] [2] [3] 


Apache Storm Trident