With Flux for Apache Storm deploying streaming topologies for real-time processing becomes less programmatic and more declarative. Using Flux for deployments makes it less likely you will have to re-compile your project just because you have re-configured or re-arranged your topology. It leverages YAML, a human-readable serialization format, to describe a topology on a whole. You might still need to write some classes, but by taking advantage of existing, generic implementations this becomes less likely.
While Flux can also be used with an existing topology, for this post we’ll take the Hive-Streaming example you can find here (blog post) to create the required topology from scratch using Flux. For experiments and demo purposes you can use the following Vagrant setup to run a HDP cluster locally.
Making Flux Part of Your Project
The main Flux class can be used to deploy the in YAML notation described topology to Storm. Topologies also with Flux are always deployed using the storm executable with the exception of using Flux instead of a custom main class describing the topology. In order to ship Flux with an existing project it needs to be part of the classpath. Maven can be used to achieve this.
<dependency> <groupId>com.github.ptgoetz</groupId> <artifactId>flux-core</artifactId> <version>0.3.0</version> </dependency>
Additionally we can set Flux as the main class using the Maven Shade plugin as following:
<plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>1.4</version> ... <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>org.apache.storm.flux.Flux</mainClass> </transformer> ... </plugin> </plugins>
Flux YAML Topology
Essentially a topolgoy consists of six core parts. It has a Name, topology specific Config, class Components, Spouts, Bolts, and a Streams definition. All this parts can be configured in
name: "flux-hive-streaming-example" config: topology.workers: 2 # defining required components components: ... # spout definitions spouts: ... # bolt definitions bolts: .... # stream definitions streams: ... # coupling of bolts and spouts to form a streaming topology
The topology relies on certain components for streaming sock prices to Hive. For once the topology relies on the Hive Streaming bolt that comes with Storm. The other component is the Kafka spout as stock prices get published to the stock_price topic. The Kafka spout also needs Zookeeper, which would also be configured in the components section of the toplogy.
ZkHosts takes a comma separated list of nodes as a single string as a constructor argument. This can be provided by using the constructorAgs configuration. The configuration of Zookeeper looks as following:
- id: "zkHosts" className: "storm.kafka.ZkHosts" constructorArgs: - "127.0.0.1:2181"
The Zookeeper component can be referenced by the Kafka spout configuration, as this needs a ZkHosts instance as a constructor argument. Configuring our Kafka Spout:
- id: "stockSpoutConfig" className: "storm.kafka.SpoutConfig" constructorArgs: - ref: "zkHosts" # brokerHosts - "stock_topic" # topic - "/kafka_storm" # zkRoot - "StormSpout" # id
For Hive Streaming we need a HiveOptions instance to define batch sizes and timeouts. But the most important part is the definition of the column fields including the partition fields also. For this we need to configure the DelimitedRecordHiveMapper carefully. The configuring for Hive Streaming can be found below:
- id: "stockPartitionFields" className: "backtype.storm.tuple.Fields" constructorArgs: - ["name"] - id: "stockColumnFields" className: "backtype.storm.tuple.Fields" constructorArgs: - ["day", "open", "high", "low", "close", "volume","adj_close"] - id: "stockDelimitedRecordHiveMapper" className: "org.apache.storm.hive.bolt.mapper.DelimitedRecordHiveMapper" configMethods: - name: "withPartitionFields" args: - ref: "stockPartitionFields" - name: "withColumnFields" args: - ref: "stockColumnFields" - id: "stockHiveOptions" className: "org.apache.storm.hive.common.HiveOptions" constructorArgs: - "thrift://one.hdp:9083" # metaStoreURI - "default" # databaseName - "stock_prices" # tableName - ref: "stockDelimitedRecordHiveMapper" # HiveMapper configMethods: - name: "withTxnsPerBatch" args: - 2 - name: "withBatchSize" args: - 100 - name: "withIdleTimeout" args: - 10
After the stock information is read from the Kafka topic a simple bolt for splitting the line and emitting the individual fields to Hive. The StockDataBolt does not take any arguments and can be directly configured in the bolts section. With this we have all the required configuration to define our stream:
# spout definitions spouts: - id: "stockSpout" className: "storm.kafka.KafkaSpout" parallelism: 1 constructorArgs: - ref: "stockSpoutConfig" # bolt definitions bolts: - id: "stockDataBolt" className: "storm_hive_streaming_example.StockDataBolt" - id: "stockHiveBolt" className: "org.apache.storm.hive.bolt.HiveBolt" constructorArgs: - ref: "stockHiveOptions" #stream definitions streams: - name: "stockSpout --> sotckDataBolt" # name isn't used (placeholder for logging, UI, etc.) from: "stockSpout" to: "stockDataBolt" grouping: type: SHUFFLE - name: "stockDataBolt --> stockHiveBolt" # name not used from: "stockDataBolt" to: "stockHiveBolt" grouping: type: SHUFFLE
Deploying the Topology
Each topolgoy is still deployed in the same manner but using storm jar, only the main class is now Flux. The deployment can be local by supplying a –locate as a parameter or remotely with –remote. A full list of options can be seen here.
$ storm jar storm-hive-streaming-example-1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --remote flux-demo.yaml
The expected output can be seen below and should describe the kind of topology that was deployed:
SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/usr/hdp/2.3.0.0-2557/storm/lib/logback-classic-1.0.13.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/home/vagrant/storm-hive-streaming-example-1.0-SNAPSHOT.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [ch.qos.logback.classic.util.ContextSelectorStaticBinder] ███████╗██╗ ██╗ ██╗██╗ ██╗ ██╔════╝██║ ██║ ██║╚██╗██╔╝ █████╗ ██║ ██║ ██║ ╚███╔╝ ██╔══╝ ██║ ██║ ██║ ██╔██╗ ██║ ███████╗╚██████╔╝██╔╝ ██╗ ╚═╝ ╚══════╝ ╚═════╝ ╚═╝ ╚═╝ +- Apache Storm -+ +- data FLow User eXperience -+ Version: 0.3.0 Parsing file: /home/vagrant/flux-demo.yaml 2121 [main] INFO org.apache.storm.flux.parser.FluxParser - loading YAML from input stream... 2153 [main] INFO org.apache.storm.flux.parser.FluxParser - Not performing property substitution. 2154 [main] INFO org.apache.storm.flux.parser.FluxParser - Not performing environment variable substitution. 3622 [main] WARN org.apache.storm.flux.FluxBuilder - Found multiple invokable constructors for class class backtype.storm.tuple.Fields, given arguments [[name]]. Using the last one found. 3625 [main] WARN org.apache.storm.flux.FluxBuilder - Found multiple invokable constructors for class class backtype.storm.tuple.Fields, given arguments [[day, open, high, low, close, volume, adj_close]]. Using the last one found. 3641 [main] INFO org.apache.storm.flux.FluxBuilder - Detected DSL topology... 4423 [main] INFO backtype.storm.utils.Utils - Using defaults.yaml from resources 4559 [main] INFO backtype.storm.utils.Utils - Using storm.yaml from resources ---------- TOPOLOGY DETAILS ---------- Topology Name: flux-hive-streaming-example --------------- SPOUTS --------------- stockSpout [1] (storm.kafka.KafkaSpout) ---------------- BOLTS --------------- stockDataBolt [1] (storm_hive_streaming_example.StockDataBolt) stockHiveBolt [1] (org.apache.storm.hive.bolt.HiveBolt) --------------- STREAMS --------------- stockSpout --SHUFFLE--> stockDataBolt stockDataBolt --SHUFFLE--> stockHiveBolt -------------------------------------- 5274 [main] INFO org.apache.storm.flux.Flux - Running remotely... 5274 [main] INFO org.apache.storm.flux.Flux - Deploying topology in an ACTIVE state... 5331 [main] INFO backtype.storm.utils.Utils - Using defaults.yaml from resources 5362 [main] INFO backtype.storm.utils.Utils - Using storm.yaml from resources 5425 [main] INFO backtype.storm.StormSubmitter - Generated ZooKeeper secret payload for MD5-digest: -7511426287572049575:-6167598404490844446 5433 [main] INFO backtype.storm.security.auth.AuthUtils - Got AutoCreds [] 5575 [main] INFO backtype.storm.utils.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [2000] the maxSleepTimeMs [60000] the maxRetries [5] 5724 [main] INFO backtype.storm.utils.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [2000] the maxSleepTimeMs [60000] the maxRetries [5] 5725 [main] INFO backtype.storm.utils.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [2000] the maxSleepTimeMs [60000] the maxRetries [5] 5754 [main] INFO backtype.storm.utils.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [2000] the maxSleepTimeMs [60000] the maxRetries [5] 5783 [main] INFO backtype.storm.utils.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [2000] the maxSleepTimeMs [60000] the maxRetries [5] 5805 [main] INFO backtype.storm.utils.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [2000] the maxSleepTimeMs [60000] the maxRetries [5] 5854 [main] INFO backtype.storm.StormSubmitter - Uploading topology jar storm-hive-streaming-example-1.0-SNAPSHOT.jar to assigned location: /hadoop/storm/nimbus/inbox/stormjar-169cb3cc-40d8-48c0-97e0-d3c971a5b63f.jar 21638 [main] INFO backtype.storm.StormSubmitter - Successfully uploaded topology jar to assigned location: /hadoop/storm/nimbus/inbox/stormjar-169cb3cc-40d8-48c0-97e0-d3c971a5b63f.jar 21639 [main] INFO backtype.storm.StormSubmitter - Submitting topology flux-hive-streaming-example in distributed mode with conf {"topology.workers":2,"storm.zookeeper.topology.auth.scheme":"digest","storm.zookeeper.topology.auth.payload":"-7511426287572049575:-6167598404490844446"} 23410 [main] INFO backtype.storm.StormSubmitter - Finished submitting topology: flux-hive-streaming-example
Publishing data to the Kafka the topic for stocks:
$ java -cp storm-hive-streaming-example-1.0-SNAPSHOT.jar storm_hive_streaming_example.KafkaStockProducer
Further Readings
- Storm Flux
- SnakeYAML
- Storm Hive Streaming Example
- Storm Blueprints (Amazon)
- Storm Real-Time Processing Cookbook (Amazon)
- Apache Kafka (Amazon)
Storm Flux: Easy Streaming Deployment http://t.co/EgVRqKX4qw
LikeLike
RT @jonbros: Storm Flux: Easy Streaming Deployment http://t.co/EgVRqKX4qw
LikeLike
RT @jonbros: Storm Flux: Easy Streaming Deployment http://t.co/EgVRqKX4qw
LikeLike
Von @jonbros: Storm Flux: Easy Streaming Deployment http://t.co/ePykNnw4eq #IronBloggerMUC
LikeLike