Storm Flux: Easy Streaming Deployment

With Flux for Apache Storm deploying streaming topologies for real-time processing becomes less programmatic and more declarative. Using Flux for deployments makes it less likely you will have to re-compile your project just because you have re-configured or re-arranged your topology. It leverages YAML, a human-readable serialization format, to describe a topology on a whole. You might still need to write some classes, but by taking advantage of existing, generic implementations this becomes less likely.

While Flux can also be used with an existing topology, for this post we’ll take the Hive-Streaming example you can find here (blog post) to create the required topology from scratch using Flux. For experiments and demo purposes you can use the following Vagrant setup to run a HDP cluster locally.

Making Flux Part of Your Project

The main Flux class can be used to deploy the in YAML notation described topology to Storm. Topologies also with Flux are always deployed using the storm executable with the exception of using Flux instead of a custom main class describing the topology. In order to ship Flux with an existing project it needs to be part of the classpath. Maven can be used to achieve this.

<dependency>
  <groupId>com.github.ptgoetz</groupId>
  <artifactId>flux-core</artifactId>
  <version>0.3.0</version>
</dependency>

Additionally we can set Flux as the main class using the Maven Shade plugin as following:

<plugins>
  <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-shade-plugin</artifactId>
    <version>1.4</version>
...
       <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
         <mainClass>org.apache.storm.flux.Flux</mainClass>
       </transformer>
...
  </plugin>
</plugins>

Flux YAML Topology

Essentially a topolgoy consists of six core parts. It has a Name, topology specific Config, class Components, Spouts, Bolts, and a Streams definition. All this parts can be configured in

name: "flux-hive-streaming-example"
config:
  topology.workers: 2

# defining required components
components: ...

# spout definitions
spouts: ...

# bolt definitions
bolts:  ....

# stream definitions
streams: ... # coupling of bolts and spouts to form a streaming topology

The topology relies on certain components for streaming sock prices to Hive. For once the topology relies on the Hive Streaming bolt that comes with Storm. The other component is the Kafka spout as stock prices get published to the stock_price topic. The Kafka spout also needs Zookeeper, which would also be configured in the components section of the toplogy.

ZkHosts takes a comma separated list of nodes as a single string as a constructor argument. This can be provided by using the constructorAgs configuration. The configuration of Zookeeper looks as following:

  - id: "zkHosts"
    className: "storm.kafka.ZkHosts"
    constructorArgs:
      - "127.0.0.1:2181"

The Zookeeper component can be referenced by the Kafka spout configuration, as this needs a ZkHosts  instance as a constructor argument. Configuring our Kafka Spout:

- id: "stockSpoutConfig"
    className: "storm.kafka.SpoutConfig"
    constructorArgs:
      - ref: "zkHosts"  # brokerHosts
      - "stock_topic"   # topic
      - "/kafka_storm"  # zkRoot
      - "StormSpout"    # id

For Hive Streaming we need a HiveOptions instance to define batch sizes and timeouts. But the most important part is the definition of the column fields including the partition fields also. For this we need to configure the DelimitedRecordHiveMapper carefully. The configuring for Hive Streaming can be found below:

  - id: "stockPartitionFields"
    className: "backtype.storm.tuple.Fields"
    constructorArgs:
      - ["name"]
      
  - id: "stockColumnFields"
    className: "backtype.storm.tuple.Fields"
    constructorArgs:
      - ["day", "open", "high", "low", "close", "volume","adj_close"]

  - id: "stockDelimitedRecordHiveMapper"
    className: "org.apache.storm.hive.bolt.mapper.DelimitedRecordHiveMapper"
    configMethods:
      - name: "withPartitionFields"
        args:
          - ref: "stockPartitionFields"
      - name: "withColumnFields"
        args:
          - ref: "stockColumnFields"

  - id: "stockHiveOptions"
    className: "org.apache.storm.hive.common.HiveOptions"
    constructorArgs:
      - "thrift://one.hdp:9083"                # metaStoreURI
      - "default"                                # databaseName
      - "stock_prices"                           # tableName
      - ref: "stockDelimitedRecordHiveMapper"   # HiveMapper
    configMethods:
      - name: "withTxnsPerBatch"
        args:
          - 2
      - name: "withBatchSize"
        args:
          - 100
      - name: "withIdleTimeout"
        args:
          - 10

After the stock information is read from the Kafka topic a simple bolt for splitting the line and emitting the individual fields to Hive. The StockDataBolt does not take any arguments and can be directly configured in the bolts section. With this we have all the required configuration to define our stream:

# spout definitions
spouts:
  - id: "stockSpout"
    className: "storm.kafka.KafkaSpout"
    parallelism: 1
    constructorArgs:
      - ref: "stockSpoutConfig"

# bolt definitions
bolts:
  - id: "stockDataBolt"
    className: "storm_hive_streaming_example.StockDataBolt"
    
  - id: "stockHiveBolt"
    className: "org.apache.storm.hive.bolt.HiveBolt"
    constructorArgs:
      - ref: "stockHiveOptions"

#stream definitions
streams:
  - name: "stockSpout --> sotckDataBolt" # name isn't used (placeholder for logging, UI, etc.)
    from: "stockSpout"
    to: "stockDataBolt"
    grouping:
      type: SHUFFLE
      
  - name: "stockDataBolt --> stockHiveBolt" # name not used
    from: "stockDataBolt"
    to: "stockHiveBolt"
    grouping:
      type: SHUFFLE

Deploying the Topology

Each topolgoy is still deployed in the same manner but using storm jar, only the main class is now Flux. The deployment can be local by supplying a –locate as a parameter or remotely with –remote. A full list of options can be seen here.

$ storm jar storm-hive-streaming-example-1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --remote flux-demo.yaml

The expected output can be seen below and should describe the kind of topology that was deployed:

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/hdp/2.3.0.0-2557/storm/lib/logback-classic-1.0.13.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/vagrant/storm-hive-streaming-example-1.0-SNAPSHOT.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [ch.qos.logback.classic.util.ContextSelectorStaticBinder]
███████╗██╗     ██╗   ██╗██╗  ██╗
██╔════╝██║     ██║   ██║╚██╗██╔╝
█████╗  ██║     ██║   ██║ ╚███╔╝
██╔══╝  ██║     ██║   ██║ ██╔██╗
██║     ███████╗╚██████╔╝██╔╝ ██╗
╚═╝     ╚══════╝ ╚═════╝ ╚═╝  ╚═╝
+-         Apache Storm        -+
+-  data FLow User eXperience  -+
Version: 0.3.0
Parsing file: /home/vagrant/flux-demo.yaml
2121 [main] INFO  org.apache.storm.flux.parser.FluxParser - loading YAML from input stream...
2153 [main] INFO  org.apache.storm.flux.parser.FluxParser - Not performing property substitution.
2154 [main] INFO  org.apache.storm.flux.parser.FluxParser - Not performing environment variable substitution.
3622 [main] WARN  org.apache.storm.flux.FluxBuilder - Found multiple invokable constructors for class class backtype.storm.tuple.Fields, given arguments [[name]]. Using the last one found.
3625 [main] WARN  org.apache.storm.flux.FluxBuilder - Found multiple invokable constructors for class class backtype.storm.tuple.Fields, given arguments [[day, open, high, low, close, volume, adj_close]]. Using the last one found.
3641 [main] INFO  org.apache.storm.flux.FluxBuilder - Detected DSL topology...
4423 [main] INFO  backtype.storm.utils.Utils - Using defaults.yaml from resources
4559 [main] INFO  backtype.storm.utils.Utils - Using storm.yaml from resources
---------- TOPOLOGY DETAILS ----------
Topology Name: flux-hive-streaming-example
--------------- SPOUTS ---------------
stockSpout [1] (storm.kafka.KafkaSpout)
---------------- BOLTS ---------------
stockDataBolt [1] (storm_hive_streaming_example.StockDataBolt)
stockHiveBolt [1] (org.apache.storm.hive.bolt.HiveBolt)
--------------- STREAMS ---------------
stockSpout --SHUFFLE--> stockDataBolt
stockDataBolt --SHUFFLE--> stockHiveBolt
--------------------------------------
5274 [main] INFO  org.apache.storm.flux.Flux - Running remotely...
5274 [main] INFO  org.apache.storm.flux.Flux - Deploying topology in an ACTIVE state...
5331 [main] INFO  backtype.storm.utils.Utils - Using defaults.yaml from resources
5362 [main] INFO  backtype.storm.utils.Utils - Using storm.yaml from resources
5425 [main] INFO  backtype.storm.StormSubmitter - Generated ZooKeeper secret payload for MD5-digest: -7511426287572049575:-6167598404490844446
5433 [main] INFO  backtype.storm.security.auth.AuthUtils - Got AutoCreds []
5575 [main] INFO  backtype.storm.utils.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [2000] the maxSleepTimeMs [60000] the maxRetries [5]
5724 [main] INFO  backtype.storm.utils.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [2000] the maxSleepTimeMs [60000] the maxRetries [5]
5725 [main] INFO  backtype.storm.utils.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [2000] the maxSleepTimeMs [60000] the maxRetries [5]
5754 [main] INFO  backtype.storm.utils.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [2000] the maxSleepTimeMs [60000] the maxRetries [5]
5783 [main] INFO  backtype.storm.utils.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [2000] the maxSleepTimeMs [60000] the maxRetries [5]
5805 [main] INFO  backtype.storm.utils.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [2000] the maxSleepTimeMs [60000] the maxRetries [5]
5854 [main] INFO  backtype.storm.StormSubmitter - Uploading topology jar storm-hive-streaming-example-1.0-SNAPSHOT.jar to assigned location: /hadoop/storm/nimbus/inbox/stormjar-169cb3cc-40d8-48c0-97e0-d3c971a5b63f.jar
21638 [main] INFO  backtype.storm.StormSubmitter - Successfully uploaded topology jar to assigned location: /hadoop/storm/nimbus/inbox/stormjar-169cb3cc-40d8-48c0-97e0-d3c971a5b63f.jar
21639 [main] INFO  backtype.storm.StormSubmitter - Submitting topology flux-hive-streaming-example in distributed mode with conf {"topology.workers":2,"storm.zookeeper.topology.auth.scheme":"digest","storm.zookeeper.topology.auth.payload":"-7511426287572049575:-6167598404490844446"}
23410 [main] INFO  backtype.storm.StormSubmitter - Finished submitting topology: flux-hive-streaming-example

Publishing data to the Kafka the topic for stocks:

$ java -cp storm-hive-streaming-example-1.0-SNAPSHOT.jar storm_hive_streaming_example.KafkaStockProducer

Further Readings

Advertisement

4 thoughts on “Storm Flux: Easy Streaming Deployment

Leave a Reply to jonbros Cancel reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s