推荐答案
-- -------------------- ---- ------- ------ ------------------------ ------ ------------------------------ ------ ------------------------------------------ ------ ------------------------------ ------ ----------------------------- ------ ----- ----------------- - ------ ------ ---- ------------- ----- - -- ------- --------------- ------- - --- ------------------ -- ------- ------------------------------ --- ------------ --- -- ------ ----------------------------- --- -------------------- --------------------------------- ----------------------------- --- ---------------- ------------------------------- --- ---------------- -- ------ ------ ------ - --- --------- ---------------------- -- -------- ------------ ------- - --- --------------- ------------------------------------------- ------- -------------------------- -- ----------- ------------------- ------------------------------------------ ------------------- - -
本题详细解读
1. 创建拓扑构建器
首先,我们需要创建一个 TopologyBuilder
对象,用于构建 Storm 拓扑。拓扑构建器允许我们定义数据流的来源(Spout)和处理逻辑(Bolt)。
TopologyBuilder builder = new TopologyBuilder();
2. 设置Spout
Spout 是拓扑中的数据源,负责从外部数据源读取数据并发射到拓扑中。在这个例子中,我们使用了一个名为 WordSpout
的自定义 Spout,并将其命名为 word-spout
。
builder.setSpout("word-spout", new WordSpout(), 1);
3. 设置Bolt
Bolt 是拓扑中的处理单元,负责处理从 Spout 或其他 Bolt 发射过来的数据。在这个例子中,我们定义了两个 Bolt:
SplitSentenceBolt
:用于将句子拆分为单词。WordCountBolt
:用于统计每个单词的出现次数。
builder.setBolt("split-bolt", new SplitSentenceBolt(), 2).shuffleGrouping("word-spout"); builder.setBolt("count-bolt", new WordCountBolt(), 2).fieldsGrouping("split-bolt", new Fields("word"));
4. 创建配置对象
Config
对象用于配置拓扑的运行参数。在这个例子中,我们启用了调试模式。
Config config = new Config(); config.setDebug(true);
5. 本地模式运行拓扑
为了在本地测试拓扑,我们使用 LocalCluster
来提交拓扑。LocalCluster
是一个本地运行的 Storm 集群,适合开发和测试。
LocalCluster cluster = new LocalCluster(); cluster.submitTopology("WordCountTopology", config, builder.createTopology());
6. 停止拓扑
在拓扑运行一段时间后,我们可以通过调用 killTopology
方法来停止拓扑,并通过 shutdown
方法关闭本地集群。
Utils.sleep(10000); cluster.killTopology("WordCountTopology"); cluster.shutdown();
通过以上步骤,我们成功创建并运行了一个简单的 Storm 拓扑,用于统计单词的出现次数。