Storm分布式RPC怎么配置

这篇文章主要介绍“Storm分布式RPC怎么配置”,在日常操作中,相信很多人在Storm分布式RPC怎么配置问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Storm分布式RPC怎么配置”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

创新互联长期为近1000家客户提供的网站建设服务,团队从业经验10年,关注不同地域、不同群体,并针对不同对象提供差异化的产品和服务;打造开放共赢平台,与合作伙伴共同营造健康的互联网生态环境。为龙凤企业提供专业的成都做网站、成都网站建设、成都外贸网站建设龙凤网站改版等技术服务。拥有10余年丰富建站经验和众多成功案例,为您定制开发。

首先需要在storm集群上把DRPC的环境准备好,在storm.yaml当中增加如下内容

 drpc.servers:
  - "192.168.1.118"

之后通过storm drpc启动分布式RPC服务。

之后,跟其他的topology并没有什么不同,我们需要写点代码,我这边直接从storm的例子当中找了个:

public class BasicDRPCTopology {
    public static class ExclaimBolt extends BaseBasicBolt {
        @Override
        public void execute(Tuple tuple, BasicOutputCollector collector) {
            String input = tuple.getString(1);
            collector.emit(new Values(tuple.getValue(0), input + "!"));
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("id", "result"));
        }

    }

    public static void main(String[] args) throws Exception {
        LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");
        builder.addBolt(new ExclaimBolt(), 3);

        Config conf = new Config();
        conf.setNumWorkers(3);
        StormSubmitter.submitTopologyWithProgressBar("DRCP-TEST", conf, builder.createRemoteTopology());
    }
}

从main函数开始,简单解释一下:

首先new一个LinearDRPCTopologyBuilder对象,其中的参数【exclamation】就是我们在执行rpc调用时候的方法名。

之后我们加入一个自己的bolt,并行数量为3

之后用StormSubmitter把这个topology提交上去就行了。

代码完成之后,打一个jar包,用storm jar把topology提交到集群上。

客户端调用,非常简单

        DRPCClient client = new DRPCClient("192.168.1.118", 3772);
        String result = client.execute("exclamation", "china");
        System.out.println(result);

到此为止,一个最简单的DRPC调用的工作已经完成了。

等等,还有点问题,LinearDRPCTopologyBuilder 这个东西是不建议使用的(我这里的版本是0.9.3)。

源码上有这么一行:

Trident subsumes the functionality provided by this class, so it's deprecated

大概意思就是trident这个东西已经包含了LinearDRPCTopologyBuilder 当中的功能。

trident是什么意思?翻译了一下,【三叉戟】,靠,看起来很牛逼的样子。必须试试。

那么上第二份代码:

public class TridentDRPCTopology {
    public static void main(String[] args) throws Exception {
        Config conf = new Config();
        StormSubmitter.submitTopologyWithProgressBar("word-count", conf, buildTopology());
    }

    public static StormTopology buildTopology() {
        TridentTopology topology = new TridentTopology();

        topology.newDRPCStream("word-count").
                each(new Fields("args"), new Split(), new Fields("word")).
                groupBy(new Fields("word")).
                aggregate(new One(), new Fields("one")).
                aggregate(new Fields("one"), new Sum(), new Fields("word-count"));
        return topology.build();
    }

    public static class Split extends BaseFunction {
        @Override
        public void execute(TridentTuple tuple, TridentCollector collector) {
            String sentence = tuple.getString(0);
            for (String word : sentence.split(" ")) {
                collector.emit(new Values(word));
            }
        }
    }

    public static class One implements CombinerAggregator {
        @Override
        public Integer init(TridentTuple tuple) {
            return 1;
        }

        @Override
        public Integer combine(Integer val1, Integer val2) {
            return 1;
        }

        @Override
        public Integer zero() {
            return 1;
        }
    }
}

这个topology的功能要稍稍复杂一些,给出一句话,查一下一共有多少个词,当然了,不能重复计数。main函数当中非常简单,提交一个topology。而这个topology的构建过程是在buildTopology当中完成的。

        topology.newDRPCStream("word-count").
                each(new Fields("args"), new Split(), new Fields("word")).    //用空格分词
                groupBy(new Fields("word")).    //分组
                aggregate(new One(), new Fields("one")).    //给每组的数量设定为1
                aggregate(new Fields("one"), new Sum(), new Fields("word-count"));    //sum计算总和

这样的方式看起来跟spark当中对RDD的操作是有些像的。

好了,还是打包,提交。

然后是客户端测试:

        DRPCClient client = new DRPCClient("192.168.1.118", 3772);
        String result = client.execute("word-count", "mywife asdf asdf asdfasdfasfweqw saaa weweew");
        System.out.println(result);

到此,关于“Storm分布式RPC怎么配置”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注创新互联网站,小编会继续努力为大家带来更多实用的文章!


网站栏目:Storm分布式RPC怎么配置
转载来于:http://myzitong.com/article/psedje.html