Storm 笔记

Posted by Fioncat on April 22, 2018

Apache Storm是一个开源的分布式实时计算系统,可以简单的,可靠地处理大量的数据流。Storm可用于实时分析、在线机器学习、持续计算、分布式RPC等。

Storm部署和运维很便捷,并且支持多种编程语言的开发。

结构

Storm的结构称为topology。由stream、spout、bolt组成。

topography维护了一个拓扑结构,其中,spout可以从外部获取数据,随后将数据传给bolt。bolt分析数据并将计算结果传给其它bolt。一个bolt可以把数据传给多个其它bolt,也可以接收来自多个bolt的数据。形成了一个网络,在网络中流动的数据被叫做stream。

Storm的topography会一直运行下去,直到进程被杀死或者取消部署。

Stream

Storm的核心数据结构是tuple,本质包含一个或者多个键值对列表。

在Stream中流动的数据就是tuple,Stream就是无限个tuple组成的队列。

spout

spout连接到数据源,并将数据源转换为一个一个的tuple,随后将tuple作为数据流进行发送。开发spout的过程实际上就是实现从数据源到数据流的转换。

spout的数据源没有任何限制,可以是网络、文件、数据库等。

spout不负责数据的运算,不处理业务逻辑,仅仅进行连接数据源,转换为tuple发送数据,所以可以很方便地复用spout。

bolt

bolt主要用于数据的计算,将接收到的数据按照业务进行计算后,选择性地输出一个或者多个输出流。

一个bolt可以接收多个由spout或bolt发送的数据流,从而可以组建出复杂的数据转换和处理的复杂拓扑结构。

bolt一般用于过滤、连接和聚合、计算数据。可以分布在Storm集群的不同节点上,实现分布式计算。

这样的结构其实和工厂的流水线式的生产方式类似。一个bolt做的事情可能非常简单,但是组合很多个bolt就可以实现一个复杂的计算任务。

开发流程

Storm提供了一个单机测试的环境,能够让我们不需要搭建Storm集群来测试程序。我们先在单机模式下演示Storm,后面再移植到集群中。

在工程中需要导入storm的开发包。不同版本的代码一般是通用的。

下面以一个简单的WordCount案例来演示Storm的开发流程。

首先,我们需要构造出WordCount任务的拓扑结构,这需要设计出spout和bolt:

  • SentenceSpout: 获取来自外部的语句,并发送给SplitSentenceBolt。
  • SplitSentenceBolt: 对语句中按照单词进行切分,并发送给WordCountBolt。
  • WordCountBolt: 统计每个单词出现了多少次,发送给ReportBolt。
  • ReportBolt: 打印结果。

这个拓扑是最简单的拓扑,是一条线的处理流程。

首先,我们开发SentenceSpout。注意所有的Storm组件都要直接或者间接地实现IComponent接口,所有Spout需要实现ISpout接口。直接实现这两个接口需要覆写大量方法,我们可以继承BaseRichSpout这个类,这个类对于很多方法做了空实现,我们可以选择性地覆写这些方法,但是对于open,declareOutputFields,nextTunple这三个方法必须实现。

简单起见,我们直接在类中定义一个sentences,wc直接处理这个sentence:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package cn.lazycat.bdd.storm.wc;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import java.util.Map;

public class SentenceSpout extends BaseRichSpout {

    // 需要处理的sentences
    private String[] sentences = {
            "hello Storm", "I like Big Data", "I like Storm",
            "goodbye Storm", "goodbye Big Data"
    };

    // 当前发送的索引
    private int index = 0;

    // 要发送的collector
    private SpoutOutputCollector collector = null;

    /**
     * 初始化,在Spout被集群调用的时候,会调用这个方法。
     * @param conf 整个topology的配置信息。
     * @param context 整个topology的上下文对象,可以取出topology的一些信息。
     * @param collector 用于从当前spout发射tuple,tuple可以在任意时间发送。
     *                  collector应该被保存在spout对象中作为局部变量使用。
     */
    @Override
    public void open(Map conf, TopologyContext context,
                     SpoutOutputCollector collector) {

        // 保存collector,方便发送数据
        this.collector = collector;

    }

    /**
     * 用于声明输出流,实际上就是声明流的编号、
     * 发射的tuple的结构、是否是指向型的流等。
     * @param declarer 输出流的描述
     */
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {

        // tuple只有一个字段,表示每一句话
        declarer.declare(new Fields("sentence"));

    }

    /**
     * 当这个方法调用的时候,Storm要求这个Spout向collector发送tuple。
     * 这个方法应该是非阻塞的,所以如果当前没有tuple发送,则直接返回。
     * 这个方法会在一个很紧密的循环中不停地被调用。如果没有任何tuple发送,
     * 最好让nextTuple睡眠一小段时间,例如1毫秒,以让线程不浪费过多CPU。
     */
    @Override
    public void nextTuple() {

        if (index < sentences.length) {
            // 发射数据,需要接收一个List<Tuple>,表示发射的数据
            // 可以使用Values来简化,它可以直接接受多个Object并将
            // 它们转换为List<Tuple>发射
            collector.emit(new Values(sentences[index++]));
        }
        else {
            try {
                Thread.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
                throw new RuntimeException(e);
            }
        }

    }

}

这就开发了一个最简单的Spout。

下面我们开发SplitSentenceBolt。需要实现IComponent和IBolt接口,我们可以继承BaseRichBolt类。

我们需要实现prepare、execute、declareOutputFields方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package cn.lazycat.bdd.storm.wc;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

import java.util.Map;

public class SplitSentenceBolt extends BaseRichBolt {

    private OutputCollector collector = null;

    /**
     * Bolt初始化的时候调用。参数和作用和Spout的open是一样的。
     */
    @Override
    public void prepare(Map stormConf, TopologyContext context,
                        OutputCollector collector) {
        this.collector = collector;
    }

    /**
     * 和Spout的也是一样,表示这个Bolt发送的Tuple的结构
     */
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }

    /**
     * 处理单一的接收到的tuple。传递的tuple包含了这个tuple来自哪里的信息。
     * tuple可以通过来自prepare的collector对象发送。
     * @param input 传递的tuple对象
     */
    @Override
    public void execute(Tuple input) {
        // 取得语句
        String sentence = input.getStringByField("sentence");

        // 拆分单词,发送
        String words[] = sentence.split(" ");
        for (String word : words) {
            collector.emit(new Values(word));
        }
    }

}

WordCountBolt的编写就很简单了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package cn.lazycat.bdd.storm.wc;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

import java.util.HashMap;
import java.util.Map;

public class WordCountBolt extends BaseRichBolt {

    private OutputCollector collector = null;

    // 保存单词和个数
    private Map<String, Integer> wordMap = null;

    @Override
    public void prepare(Map stormConf, TopologyContext context,
                        OutputCollector collector) {
        this.collector = collector;
        this.wordMap = new HashMap<>();
    }

    @Override
    public void execute(Tuple input) {
        String word = input.getStringByField("word");
        if (wordMap.containsKey(word)) {
            wordMap.put(word, wordMap.get(word) + 1);
        }
        else {
            wordMap.put(word, 1);
        }

        // 此时单词的数量发生了变化,应该发射这个变化给下一层
        collector.emit(new Values(word, wordMap.get(word)));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // word表示单词,count表示单词出现的次数
        declarer.declare(new Fields("word", "count"));
    }
}

每当接受到上一层Bolt传来的数据,就表示单词数量发生了变化,WordCountBolt就会在内部记录下这个变化(在Map中保存)并且发送给下一层。整个过程是动态的。

最后,就是输出结果的ReportBolt:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package cn.lazycat.bdd.storm.wc;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;

import java.util.Map;

public class ReportBolt extends BaseRichBolt {
    @Override
    public void prepare(Map stormConf,
                        TopologyContext context, OutputCollector collector) {

    }

    @Override
    public void execute(Tuple input) {
        String word = input.getStringByField("word");
        int count = input.getIntegerByField("count");

        System.out.println("word change => word: "
                + word + ", count: " + count);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }
}

以上,所有Spout和Bolt开发完毕,接下来需要建立它们的Topology并且运行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package cn.lazycat.bdd.storm.wc;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;

public class WordCount {

    public static void main(String[] args) throws InterruptedException {
        // 创建组件
        SentenceSpout spout = new SentenceSpout();
        SplitSentenceBolt splitBolt = new SplitSentenceBolt();
        WordCountBolt wcBolt = new WordCountBolt();
        ReportBolt reportBolt = new ReportBolt();

        // 拓扑构建者,用于组件组件
        TopologyBuilder builder = new TopologyBuilder();

        // 告诉构建者描述拓扑结构
        // 每个组件都必须有独一无二的名称
        builder.setSpout("sentence_spout", spout);
        // 告诉builder split_bolt 插在 sentence_spout 后面
        builder.setBolt("split_bolt", splitBolt).shuffleGrouping("sentence_spout");
        // wc_bolt 插在 split_bolt 后面
        builder.setBolt("wc_bolt", wcBolt).fieldsGrouping(
                "split_bolt", new Fields("word")
        );
        // report_bolt 插在 wc_bolt 后面,并且是最后一个bolt。
        builder.setBolt("report_bolt", reportBolt).globalGrouping("wc_bolt");

        // 创建拓扑
        StormTopology topology = builder.createTopology();

        // 创建本地集群,模拟运行
        LocalCluster cluster = new LocalCluster();
        Config conf = new Config();
        cluster.submitTopology("wc", conf, topology);

        // 10 秒后停止拓扑,否则拓扑会一直执行下去
        Thread.sleep(10000);
        cluster.killTopology("wc");
        cluster.shutdown();
    }

}

Storm是实时分析的(来一条数据分析一次),一般来说会一直运行下去,这里我们在10秒后停止Storm。

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
word change => word: hello, count: 1
word change => word: Storm, count: 1
word change => word: I, count: 1
word change => word: like, count: 1
word change => word: Big, count: 1
word change => word: Data, count: 1
word change => word: I, count: 2
word change => word: like, count: 2
word change => word: Storm, count: 2
word change => word: goodbye, count: 1
word change => word: Storm, count: 3
word change => word: goodbye, count: 2
word change => word: Big, count: 2
word change => word: Data, count: 2

并发控制

在一个Storm集群中,topology由下面四个主要的部分组成:

  • Node: 服务器,指的是配置在集群中的一台服务器,会执行topology的一部分任务。提供集群级别的并发支持。
  • Worker: 任务进程,指的是在Node上运行的JVM进程,提供在Node级别的并发支持。
  • Executor: 线程。指的是worker中执行任务的线程,提供worker级别的并发支持。
  • Task: spout和bolt的实例,如果一个spout和bolt任务有多个task,那么Storm会创建它们的多个实例。executor会调用这些实例的nextTuple()和execute()方法,这就是执行task。一个executor可能会执行多个task。

如果不明确指定,Storm的默认并发级别是1。也就是每个Node只有一个worker,每个executor执行一个task,每个spout和bolt只有一个task。唯一的并发是线程,也就是worker可能有多个executor(取决于你有多少个spout和bolt)。

我们可以手动增加并发度。我们可以使用API或者修改配置文件来修改并发度。

1
2
3
4
5
conf.setNumWorkers(num);  // 设置worker
builder.setSpout("spout_name", spout, numberSpout);  // 设置spout executor
builder.setBolt("bolt_name", bolt, numberBolt);  // 设置 bolt executor
builder.setSpout(...).setNumTasks(taskNumber);  // 设置spout有几个task
builder.setBolt(...).setNumTasks(taskNumber);  // 设置bolt有几个task

注意在单机模式增加worker没有任何效果。

如果一个bolt有多个task,其上层在发射数据的时候,需要选择把数据发送给哪一个task,这涉及到数据流的分组问题。

Storm提供了7种分组方式,也就是tuple发送给下一层task的策略:

  1. Shuffle Grouping,随机分组,随机分发数据给bolt的各个task,每个task接受到的tuple数量相等。
  2. Fields Grouping,字段分组,根据指定字段的值进行分组,指定字段具有相同值的tuple会被发射给同一个task。
  3. Global Grouping,所有的tuple都发射到唯一的一个task上,Storm按照最小的task id来选取对应的task。如果使用这种分组方式,配置bolt的task数量没有任何意义。但是这种方式会导致大量tuple发送到一个Storm Node上,会导致这个Node成为热点。
  4. All Grouping,全分组,指的是tuple会被复制然后发送到下一层的所有task上。
  5. Node Grouping,不分组,效果和Shuffle一样,保留。
  6. Direct Grouping,指向型分组,会通过emitDirect()方法来判断一个tuple应该由哪个他上课接收。
  7. Local or Shuffle Grouping,本地或随机分组,和随机分组类似,但是优先把tuple发送给本地(当前进程)存在的task,可以减少网络传输。

选择策略是在告知builder下一层bolt是谁的时候调用的。直接在setBolt()后调用对应的Grouping方法即可。

重新看wc案例,我们发现分组的策略是这样的:

  • Spout发送数据给SplitSentenceBolt,使用Shuffle Grouping,因为切分单词的操作都是一样的,发给哪个task处理无所谓。
  • SplitSentenceBolt发送数据给WcBolt,如果使用Shuffle Grouping,每个task是不同的实例,它们保存的单词计数可能是不一样的,所以会产生统计不一致的问题。所以需要使用Fields Grouping,这就保证切分结果中word相同的tuple会发送给同一个task,避免了数据不一致的问题。
  • 所有WcBolt统计得到的数据全部都要发给一个ReportBolt,所以使用Global Grouping。

可靠性

Storm提供了数据流处理的可靠性。指的是每个tuple能够按照意愿正确地被处理,保证最终的处理结果是正确的。

Storm支持实现三个级别的可靠性:

  • 保证tuple不会因为网络原因丢失,tuple一定能被处理完毕。
  • 保证tuple不会被重复处理,也就是所有tuple只会被处理一次。
  • 保证tuple按照顺序处理。

可靠性越高,Storm的效率就会越低。在默认情况下,Storm没有启用可靠性支持,也就是不进行任何可靠性检查。

下面具体来看三个可靠性的详情:

所有tuple至少被处理一次

这个级别的可靠性,spout需要记录它发射的tuple。下游bolt处理完毕tuple后会返回一个报告,如果失败,则会重新发送一遍tuple。bolt在处理tuple时,不论处理成功或失败,都要向spout汇报。

关于可靠性,在ISpout中有两个相关的接口:ack(), fail()。其中,ack()表示数据处理成功调用的方法,fail()表示数据发送失败时调用的方法。

一般来说,spout维护一个已经发送tuple的队列。当ack()方法调用时,删除队列中处理成功的tuple,当fail()方法调用时,重新发送数据。

在nextTuple()方法中,需要指定每个发送tuple的id,以唯一标识这个tuple,实现监控这个tuple的状态。

wc案例中为spout增加第一级别可靠性支持:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@Override
public void nextTuple() {

    if (index < sentences.length) {
        // emit多传一个参数,表示这个tuple的编号
        // 消息成功或者失败,这个编号会被返回给ack或fail
        collector.emit(new Values(sentences[index]), index);

        // 增加发送的tuple的编号
        send.add(index++);
    }
    else {
        try {
            Thread.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
    }

}

// 发送的tuple的编号
private List<Integer> send = new ArrayList<>();

@Override
public void ack(Object msgId) {
    Integer ackIndex = (Integer) msgId;
    // 删除发送的编号
    Iterator<Integer> ite = send.iterator();
    while (ite.hasNext()) {
        if (ite.next().equals(ackIndex)) {
            ite.remove();
        }
    }
}

@Override
public void fail(Object msgId) {
    // 重新发送数据
    Integer badIndex = (Integer) msgId;
    collector.emit(new Values(sentences[badIndex]), badIndex);
}

bolt中,在发射衍生的tuple,需要先锚定上一级的tuple。也就是在发射子tuple前,将上一级tuple和衍生tuple的关系进行绑定。这样下游的bolt在ack/fail时,能够告知spout要对哪个最初的tuple调用ack()或者fail()方法。

锚定操作通过collector完成,这样collector就能够知道tuple之间的“族系”关系。

通过调用collector的重载的emit可以实现锚定:

1
collector.emit(tuple, new Values(...));

这就将新的Values和原来的tuple之间做了一个关系绑定。

当处理完成tuple,需要应答上一层ack/fail。这个方法通过collector调用,传入处理成功或者失败的tuple:

1
2
collector.ack(tuple);
collector.fail(tuple);

于是乎我们可以改进SplitBolt的execute()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Override
public void execute(Tuple input) {
    try {
        String sentence = input.getStringByField("sentence");
        String words[] = sentence.split(" ");
        for (String word : words) {
            // 将衍生的tuple和原生tuple进行锚定
            collector.emit(input, new Values(word));
        }

        // 处理成功
        collector.ack(input);
    } catch (Exception e) {
        e.printStackTrace();

        // 产生异常,处理失败
        collector.fail(input);
    }
}

事务型拓扑

事实上,后两个级别的可靠性是可以一起实现的。

因为只要保证tuple是按照顺序发送的,那么就可以记录下最后一次处理成功的tuple的编号。这样在每次接收到tuple的时候,判断这个编号和最后一次的编号大小,如果小于,则拒绝处理。

我们可以一次发送一批tuple,给每一批数据设置一个编号,当这一批数据完全被处理完毕后,再发送下一批的数据。这样在bolt中可以根据批的编号去判断。批内的tuple是没有顺序的,能够并发执行。

这个批应该符合事务的规范,要么全部一起被完成,要么全部一起失败。

但是这种方案存在缺点,因为在bolt处理的时候,其它bolt是不能够处理数据的,也就是这时候它们是空闲的,导致服务器资源被浪费。

Storm的解决方案是,对于一些bolt,并没有tuple只需要处理一次的需求,例如wc中,SplitBolt和ReportBolt并没有tuple只处理一次的需求。因为切分单词和打印结果重复进行对于整个处理结果事实上并没有什么影响。但是,WcBolt却不一样,如果重复统计一个单词的数量,就会造成最终结果的错误。所以我们可以只保证某些特殊的bolt是不会重复处理的。

对于不需要只被处理一次的bolt,成为process阶段的bolt;对于严格需要只处理一次的bolt,称为commit阶段的bolt。

在发送数据的时候,还是一批一批的发,每一个批次有一个递增的编号,在process阶段,批任意并发,即使是重复的数据,也会处理。在commit阶段,会严格记录当前编号,所有批次必须按序进行处理。对于比当前编号小的批,丢弃。

上述的可靠性支持,原生的API是不支持的。是Storm另外开发的,叫做TransactionTopology。这套API的使用方法和原来的不同。

我们需要写一个事务型的spout,继承自BaseTransactionalSpout。

这个类涉及到两个对象:coordinator和emitter。其中,coordinator负责组织批的信息,和后续coordinator保持联系,如果发现后面的数据丢失,重新发送数据;emitter负责得到批的信息后真正地进行信息进行发送。这二者之间通过MetaData进行交互。

TransactionSpout自带第一级别的可靠性,不需要编写任何额外的代码。

除了TransactionSpout,我们还需要开发手动开发coordinator。这需要我们继承Coordinator这个接口,泛型表示MetaData的类型。

在wc案例中,我们编写一个专门返回句子的类,让coordinator去调用这个类以取出数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package cn.lazycat.bdd.storm.wc.trans;

public class SentenceGetter {

    private static String[] sentences = {
            "hello Storm", "I like Big Data", "I like Storm",
            "goodbye Storm", "goodbye Big Data", "hello cat",
            "I am very boring", "goodbye cat", "he he he"
    };

    public static String getSentence(int i) {
        return sentences[i];
    }

    public static int length() {
        return sentences.length;
    }
}

我们还需要一个MetaData,用于规定coordinator和emmiter之间的数据交互方式,在本例中,coordinator需要告知emmiter需要传输多少行的数据,因此需要指定开始行和结束行,注意这个类需要进行序列化:

1
2
3
4
5
6
7
8
9
10
11
12
package cn.lazycat.bdd.storm.wc.trans;

public class SentenceMetaData implements java.io.Serializable {

    // 表示一个批次的开始行
    private int startRow;

    // 表示一个批次的结束行
    private int endRow;

    // getter setter 略
}

在coordinator中,实现组织事务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package cn.lazycat.bdd.storm.wc.trans;

import backtype.storm.transactional.ITransactionalSpout.Coordinator;

import java.math.BigInteger;

public class TransactionCoordinator implements Coordinator<SentenceMetaData> {

    private boolean hasMore = true;  // 是否有更多的数据

    private int batchSize = 3;  // 每一批的数据个数

    /**
     * 在isReady方法返回true时,Storm会调用这个方法来真正启动一个事务。
     * Storm会传入这个批的编号和上一个批的元数据信息,要求返回新的批的
     * 元数据信息。
     * @param txid 新的批的编号
     * @param prevMetadata 上一个批的元数据信息
     * @return 新的批的元数据信息
     */
    @Override
    public SentenceMetaData initializeTransaction(BigInteger txid,
                                                  SentenceMetaData prevMetadata) {
        int startRow, endRow;
        // 上一次没有元数据,表示这个事务是第一个事务
        if (prevMetadata == null) {
            startRow = 0;
        }
        else {
            // 从上一批的结束开始
            startRow = prevMetadata.getEndRow();
        }

        // 如果剩余的数据比bathSize大,取出下一批,否则,取出全部数据
        if (SentenceGetter.length() - startRow > batchSize) {
            endRow = startRow + batchSize;
        }
        else {
            endRow = SentenceGetter.length();
            hasMore = false;  // 没有更多数据了
        }

        SentenceMetaData meta = new SentenceMetaData();

        meta.setStartRow(startRow);
        meta.setEndRow(endRow);

        return meta;
    }

    /**
     * 如果已经准备好发送一个新的事务,返回true。如果要跳过这个事务,返回false。
     * 如果在下一个事务开始前,想有一点延迟,可以在这个方法中沉睡一会。
     * Storm会循环调用这个方法来询问是否准备好发送一批数据。
     * @return 是否准备好发送下一批数据
     */
    @Override
    public boolean isReady() {
        return hasMore;
    }

    /**
     * 销毁对象前调用,用来释放资源
     */
    @Override
    public void close() {

    }
}

Storm会不停问coordinator询问SentenceMetaData,如果有了,会把这个数据发送给Emitter,所以接下来我们需要开发Emitter:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package cn.lazycat.bdd.storm.wc.trans;

import backtype.storm.coordination.BatchOutputCollector;
import backtype.storm.transactional.ITransactionalSpout.Emitter;
import backtype.storm.transactional.TransactionAttempt;
import backtype.storm.tuple.Values;

import java.math.BigInteger;

public class TransactionEmitter implements Emitter<SentenceMetaData> {

    /**
     * 批量发送数据,这利用collector实现。
     * 可以接收到来自coordinator的元数据,根据元数据发送数据。
     * 对于同样事务编号,必须发送同样的数据。这样在批发送失败
     * 后能够重新发送,并保证同样编号的批是一致的。
     * 所有批的第一个字段必须是tx。
     * @param tx 事务编号
     * @param meta 来自coordinator的元数据
     * @param collector 根据它来发射数据
     */
    @Override
    public void emitBatch(TransactionAttempt tx,
                          SentenceMetaData meta,
                          BatchOutputCollector collector) {

        int start = meta.getStartRow();
        int end = meta.getEndRow();

        // 发射数据
        for (int i = start; i < end; ++i) {
            String sentence = SentenceGetter.getSentence(i);
            collector.emit(new Values(tx, sentence));
        }

    }

    /**
     * 清理事务的状态。Emitter需要持有一些历史的事务以进行重发。
     * 当事务执行成功后,Storm会执行这个方法,把执行成功的事务
     * 编号传入,以让Emitter清理已经成功的事务编号。
     * @param txid 成功处理的批的事务编号
     */
    @Override
    public void cleanupBefore(BigInteger txid) {

    }

    @Override
    public void close() {

    }
}

在本次业务中,我们并没有实现cleanupBefore()和close()的需求。

随后开发Spout,指定我们自定义的coordinator和emitter:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package cn.lazycat.bdd.storm.wc.trans;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseTransactionalSpout;
import backtype.storm.tuple.Fields;

import java.util.Map;

public class TransactionSentenceSpout
        extends BaseTransactionalSpout<SentenceMetaData> {

    @Override
    public Coordinator<SentenceMetaData> getCoordinator(Map conf,
                                              TopologyContext context) {
        return new TransactionCoordinator();
    }

    @Override
    public Emitter<SentenceMetaData> getEmitter(Map conf,
                                      TopologyContext context) {
        return new TransactionEmitter();
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // 需要增加一个txid,这个值由Storm自动赋值
        declarer.declare(new Fields("txid", "sentence"));
    }
}

随后我们需要开发Bolt,继承BaseBatchBolt接口,接收泛型表示事务编号的类型,表示这个bolt是处理批量tuple的bolt。

下面开发wc的三个bolt:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package cn.lazycat.bdd.storm.wc.trans;

import backtype.storm.coordination.BatchOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBatchBolt;
import backtype.storm.transactional.TransactionAttempt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

import java.util.*;

public class SplitSentenceTransactionBolt extends BaseBatchBolt<TransactionAttempt> {

    private BatchOutputCollector collector;

    // 缓存的待发送的单词
    private List<String> wordsBuffer = new ArrayList<>();

    // 当前的事务id
    private TransactionAttempt txid = null;

    @Override
    public void prepare(Map conf, TopologyContext context,
                        BatchOutputCollector collector, TransactionAttempt id) {

        this.collector = collector;
        this.txid = id;
    }

    /**
     * 批中的所有tuple会进入此方法进行处理
     * @param tuple 当前处理的tuple
     */
    @Override
    public void execute(Tuple tuple) {
        String sentence = tuple.getStringByField("sentence");
        String words[] = sentence.split(" ");

        // 在内存中缓存要发送的单词
        wordsBuffer.addAll(Arrays.asList(words));
    }

    /**
     * 当批中的所有tuple都被处理完后,会调用这个方法,处理整个批
     */
    @Override
    public void finishBatch() {
        // 发送所有缓存的tuple
        for (String word : wordsBuffer) {
            collector.emit(new Values(txid, word));
        }

    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("txid", "word"));
    }
}

注意wc bolt是commit的,所以要保存最后一次处理的批次编号,在接收数据的时候如果发现数据编号比最后一次的小,那么要拒绝数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package cn.lazycat.bdd.storm.wc.trans;

import backtype.storm.coordination.BatchOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBatchBolt;
import backtype.storm.transactional.TransactionAttempt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

public class SentenceWcBolt extends BaseBatchBolt<TransactionAttempt> {

    private BatchOutputCollector collector = null;

    private TransactionAttempt txid;

    // 最后一个批的编号
    // 真正的开发中这个数值应该储存在外部,因为一旦设置并发度,
    // 可能有多个Node,多个worker,无法保证用到同一个lastTransId
    private static int lastTransId = 0;

    // 保存全局的单词计数
    private static Map<String, Integer> wordCount = new ConcurrentHashMap<>();

    // 表示当前批次统计的数据
    private Set<String> curWords = new LinkedHashSet<>();

    @Override
    public void prepare(Map conf, TopologyContext context,
                        BatchOutputCollector collector, TransactionAttempt id) {

        this.collector = collector;
        this.txid = id;
    }

    @Override
    public void execute(Tuple tuple) {
        String word = tuple.getStringByField("word");
        if (wordCount.containsKey(word)) {
            wordCount.put(word, wordCount.get(word) + 1);
        }
        else {
            wordCount.put(word, 1);
        }
        curWords.add(word);
    }

    @Override
    public void finishBatch() {
        if (txid.getTransactionId().intValue() <= lastTransId) {
            return;
        }
        else {
            for (String word : curWords) {
                collector.emit(new Values(txid, word, wordCount.get(word)));
            }
        }
        // 更新最后一次编号
        lastTransId = txid.getTransactionId().intValue();
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("txid", "word", "count"));
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
package cn.lazycat.bdd.storm.wc.trans;

...
public class TransactionReportBolt extends BaseBatchBolt<TransactionAttempt> {
...
    @Override
    public void execute(Tuple tuple) {
        String word = tuple.getStringByField("word");
        int count = tuple.getIntegerByField("count");

        System.out.println("word change => word:" + word + ",count = " + count);
    }
...
}

最后把它们组装成拓扑并运行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package cn.lazycat.bdd.storm.wc.trans;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.generated.StormTopology;
import backtype.storm.transactional.TransactionalTopologyBuilder;
import backtype.storm.tuple.Fields;

public class TransactionWC {

    public static void main(String[] args) throws InterruptedException {
        TransactionSentenceSpout spout = new TransactionSentenceSpout();
        SplitSentenceTransactionBolt splitBolt = new SplitSentenceTransactionBolt();
        SentenceWcBolt wcBolt = new SentenceWcBolt();
        TransactionReportBolt reptBolt = new TransactionReportBolt();

        TransactionalTopologyBuilder builder = new
                TransactionalTopologyBuilder("Trans_Sentence_Topology",
                "trans_sentence_spout", spout);
        builder.setBolt("trans_split_sentence_Bolt",
                splitBolt).shuffleGrouping("trans_sentence_spout");
        // word count bolt 是一个commit的bolt
        builder.setCommitterBolt("trans_word_count_bolt", wcBolt).
                fieldsGrouping("trans_split_sentence_Bolt", new Fields("word"));
        builder.setBolt("trans_report_bolt", reptBolt).
                globalGrouping("trans_word_count_bolt");

        StormTopology topology = builder.buildTopology();
        LocalCluster cluster = new LocalCluster();
        Config conf = new Config();
        cluster.submitTopology("Trans_Sentence_Topology", conf, topology);

        Thread.sleep(10000);
        cluster.killTopology("Trans_Sentence_Topology");
        cluster.shutdown();
    }

}

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
word change => word:hello,count = 1
word change => word:Storm,count = 2
word change => word:I,count = 2
word change => word:like,count = 2
word change => word:Big,count = 1
word change => word:Data,count = 1
word change => word:goodbye,count = 2
word change => word:Storm,count = 3
word change => word:Big,count = 2
word change => word:Data,count = 2
word change => word:hello,count = 2
word change => word:cat,count = 1
word change => word:I,count = 3
word change => word:am,count = 1
word change => word:very,count = 1
word change => word:boring,count = 1
word change => word:goodbye,count = 3
word change => word:cat,count = 2
word change => word:he,count = 3

如果不显示声明commit和process阶段,默认全部是process阶段,如果要让一个bolt成为commit阶段,可以让bolt实现ICommitter接口,或是调用setCommitBolt()方法(如例子中所示)。

事实上上面的事务型拓扑已经被过时掉了。因为这套API设计的并不好,我们发现我们编码的时候需要花很多精力在Transaction的维护上,而不是专注于业务。我们需要对Storm事务处理流程比较了解才能够实现这种代码。

所以这份API被作者过时了。提供了一套新的Trident API。但是Trident API比较抽象,用的也比较少,所以这里也不再介绍。

集群搭建

  • Storm集群遵循主/从结构。
  • Storm的主节点是半容错的。
  • Strom集群由一个主节点(nimbus)和一个或者多个工作节点(supervisor)组成。除此之外Storm集群还需要一个ZooKeeper的来进行集群协调。
  • nimbus守护进程主要的责任是管理,协调和监控在集群上运行的topology。包括topology的发布,任务的指派,事件处理失败时重新指派任务。
  • supervisor守护进程等待nimbus分配任务后生成并监控workers执行任务。supervisor和worker都是运行在不同的JVM进程上,如果supervisor启动的worker进程因为错误异常退出,supervisor将会尝试重新生成新的worker进程。

nimbus将topology发布到Storm集群,将预先打包成jar文件的topology和配置信息提交到nimbus服务器上,一旦nimbus接收到topology的压缩包,会将jar包分发到足够数量的supervisor节点上,当supervisor节点接收到了topology压缩文件,nimbus就会指派task到每个supervisor并且发送信号指示supervisor生成足够的worker来执行指派的task。nimbus记录所有的supervisor节点的状态和分配给他们的task。

如果nimbus发现某个supervisor没有上报心跳或者已经不可达了,他将会将故障supervisor分配的task重新分配到集群中的其他supervisor节点。

nimbus并不参与topology的数据处理过程,只是负责topology的初始化、任务分发和进程监控。因此,即使nimbus守护进程在topology运行时停止了,只要分配的supervisor和worker健康运行,topology会一直继续处理数据,所以称之为半容错机制。

搭建Storm集群需要ZK和jdk的支持,安装Storm只需要解压压缩包即可。

需要改变conf下的storm.yaml文件配置:

必须修改的项:

  • storm.zookeeper.services:配置zookeeper集群的主机名称。
  • nimbus.host:指定了集群中nimbus的节点。
  • supervisor.slots.ports:配置控制每个supervisor节点运行多少个worker进程。这个配置定义为worker监听的端口的列表,监听端口的个数控制了supervisor节点上有多少个worker的插槽。默认的storm使用6700~6703端口,每个supervisor节点上有4个worker插槽。
  • storm.local.dir:storm工作时产生的工作文件存放的位置,注意,要避免配置到/tmp下。

可选的常用修改项:

  • nimbus.childopts(default: -Xms1024m):这项JVM配置会添加在启动nimbs守护进程的java命令行中。
  • ui.port(default:8080):这项配置指定了Storm UI的Web服务器监听的端口。
  • ui.childopts(default:-Xms1024m):这项JVM配置会添加在StormUI服务启动的Java命令行中。
  • supervisor.childopts(default:-Xms768m):这项JVM配置会添加Supervisor服务启动的Java命令行中。
  • worker.childopts(default:-Xms768m):这项JVM配置会添加worker服务启动的Java命令行中。
  • topology.message.timeout.secs(default:30):这个配置项定义了一个tuple树需要应答最大时间秒数限制,超过这个时间则认为超时失败。
  • topology.max.spout.pending(default:null):在默认值null的情况下,spout每当产生新的tuple时会立即向后端发送,由于下游bolt执行可能具有延迟,可能导致topology过载,从而导致消息处理超时。如果手动将该值改为非null正整数时,会通过暂停spout发送数据来限制同时处理的tuple不能超过这个数,从而达到为Spout限速的作用。
  • topology.enable.message.timeouts(default:true):这个选项用来锚定的tuple的超时时间。如果设置为false,则锚定的tuple不会超时。

Storm常见命令:

  • 启动命令
    • storm nimbus 启动nimbus守护进程
    • storm supervisor 启动supervisor守护进程
    • storm ui 启动stormui的守护进程,从而可以通过webUI界面来监控storm运行过程
    • storm drpc 启动一个DRPC服务守护进程
  • 管理命令
    • storm jar topology_jar topology_class[arguments…] 向集群提交topology。它会使用指定的参数运行topology_class中的main()方法,同时上传topology_jar文件到nimbus以分发到整个集群。提交后,Storm集群会激活并且开始运行topology。topology中的main()方法需要调用StormSubmitter.submitTopology()方法,并且为topology提供集群内唯一的名称。
    • storm kill topology_name[-w wait_time] 用来关闭已经部署的topology。
    • storm deactivate topology_name 停止指定topology的spout发送tuple
    • storm activate topology_name 恢复指定topology的spout发送tuple。
    • storm rebalance topology_name -w wait_time -n worker_count -e component_name=executor_count 指定storm在集群的worker之间重新平均地分配任务,不需要关闭或者重新提交现有的topology。当执行rebalance命令时,Storm会先取消激活topology,等待配置的的时间使剩余的tuple处理完成,然后再supervisor节点中均匀的重新分配worker。重新分配后,Storm会将topology恢复到之前的激活状态。
    • storm remoteconfvalue conf-name 用来查看远程集群中的配置参数值。