This section describes how to convert and run a complete Storm topology developed using Storm API.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-storm_2.11</artifactId>
<version>1.4.0</version>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.4.0</version>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.4.0</version>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
If the project is not a non-Maven project, manually collect the preceding JAR packages and add them to the classpath environment variable of the project.
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new RandomSentenceSpout(), 5);
builder.setBolt("split", new SplitSentenceBolt(), 8).shuffleGrouping("spout");
builder.setBolt("count", new WordCountBolt(), 12).fieldsGrouping("split", new Fields("word"));
Config conf = new Config();
conf.setNumWorkers(3);
StormSubmitter.submitTopology("word-count", conf, builder.createTopology());
Perform the following operations:
Config conf = new Config(); conf.setNumWorkers(3); //converts Storm Config to StormConfig of Flink. StormConfig stormConfig = new StormConfig(conf); //Construct FlinkTopology using TopologBuilder of Storm. FlinkTopology topology = FlinkTopology.createTopology(builder); //Obtain the Stream execution environment. StreamExecutionEnvironment env = topology.getExecutionEnvironment(); //Set StormConfig to the environment variable of Job to construct Bolt and Spout. //If StormConfig is not required during the initialization of Bolt and Spout, you do not need to set this parameter. env.getConfig().setGlobalJobParameters(stormConfig); //Submit the topology. topology.execute();