在本章中,我们将讨论Apache Storm的实时应用.我们将看看在Twitter上如何使用Storm.
Twitter是一种在线社交网络服务,提供发送和接收用户推文的平台.注册用户可以阅读和发布推文,但未注册的用户只能阅读推文. Hashtag用于通过在相关关键字之前附加#来按关键字对推文进行分类.现在让我们实时查看每个主题最常用的主题标签.
Spout Creation
spout的目的是获取人们尽快提交的推文. Twitter提供"Twitter Streaming API",这是一个基于Web服务的工具,用于检索人们实时提交的推文.可以使用任何编程语言访问Twitter Streaming API.
twitter4j 是一个开源的非官方Java库,它提供了一个基于Java的模块,可以轻松访问Twitter流式API. twitter4j 提供了一个基于监听器的框架来访问推文.要访问Twitter Streaming API,我们需要登录Twitter开发者帐户,并且应该获得以下OAuth身份验证详细信息.
Customerkey
CustomerSecret
AccessToken
AccessTookenSecret
Storm在其入门套件中提供了一个twitter spout, TwitterSampleSpout,.我们将使用它来检索推文. spout需要OAuth身份验证详细信息和至少一个关键字. spout将根据关键字发出实时推文.完整的程序代码如下所示.
编码:TwitterSampleSpout.java
import java.util.Map;import java.util.concurrent.LinkedBlockingQueue;import twitter4j.FilterQuery;import twitter4j.StallWarning;import twitter4j.Status;import twitter4j.StatusDeletionNotice;import twitter4j.StatusListener;import twitter4j.TwitterStream;import twitter4j.TwitterStreamFactory;import twitter4j.auth.AccessToken;import twitter4j.conf.ConfigurationBuilder;import backtype.storm.Config;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichSpout;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import backtype.storm.utils.Utils;@SuppressWarnings("serial")public class TwitterSampleSpout extends BaseRichSpout { SpoutOutputCollector _collector; LinkedBlockingQueuequeue = null; TwitterStream _twitterStream; String consumerKey; String consumerSecret; String accessToken; String accessTokenSecret; String[] keyWords; public TwitterSampleSpout(String consumerKey, String consumerSecret, String accessToken, String accessTokenSecret, String[] keyWords) { this.consumerKey = consumerKey; this.consumerSecret = consumerSecret; this.accessToken = accessToken; this.accessTokenSecret = accessTokenSecret; this.keyWords = keyWords; } public TwitterSampleSpout() { // TODO Auto-generated constructor stub } @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { queue = new LinkedBlockingQueue (1000); _collector = collector; StatusListener listener = new StatusListener() { @Override public void onStatus(Status status) { queue.offer(status); } @Override public void onDeletionNotice(StatusDeletionNotice sdn) {} @Override public void onTrackLimitationNotice(int i) {} @Override public void onScrubGeo(long l, long l1) {} @Override public void onException(Exception ex) {} @Override public void onStallWarning(StallWarning arg0) { // TODO Auto-generated method stub } }; ConfigurationBuilder cb = new ConfigurationBuilder(); cb.setDebugEnabled(true) .setOAuthConsumerKey(consumerKey) .setOAuthConsumerSecret(consumerSecret) .setOAuthAccessToken(accessToken) .setOAuthAccessTokenSecret(accessTokenSecret); _twitterStream = new TwitterStreamFactory(cb.build()).getInstance(); _twitterStream.addListener(listener); if (keyWords.length == 0) { _twitterStream.sample(); }else { FilterQuery query = new FilterQuery().track(keyWords); _twitterStream.filter(query); } } @Override public void nextTuple() { Status ret = queue.poll(); if (ret == null) { Utils.sleep(50); } else { _collector.emit(new Values(ret)); } } @Override public void close() { _twitterStream.shutdown(); } @Override public Map getComponentConfiguration() { Config ret = new Config(); ret.setMaxTaskParallelism(1); return ret; } @Override public void ack(Object id) {} @Override public void fail(Object id) {} @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("tweet")); }}
Hashtag Reader Bolt
spout发出的推文将被转发到 HashtagReaderBolt ,它将处理推文并发出所有可用的主题标签. HashtagReaderBolt使用twitter4j提供的 getHashTagEntities 方法. getHashTagEntities读取推文并返回主题标签列表.完整的程序代码如下 :
编码:HashtagReaderBolt.java
import java.util.HashMap;import java.util.Map;import twitter4j.*;import twitter4j.conf.*;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.IRichBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Tuple;public class HashtagReaderBolt implements IRichBolt { private OutputCollector collector; @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(Tuple tuple) { Status tweet = (Status) tuple.getValueByField("tweet"); for(HashtagEntity hashtage : tweet.getHashtagEntities()) { System.out.println("Hashtag: " + hashtage.getText()); this.collector.emit(new Values(hashtage.getText())); } } @Override public void cleanup() {} @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("hashtag")); } @Override public MapgetComponentConfiguration() { return null; }}
标签计数器螺栓
发出的标签将转发到 HashtagCounterBolt .这个bolt将处理所有的hashtags并使用Java Map对象将每个hashtag及其计数保存在内存中.完整的程序代码如下所示.
编码:HashtagCounterBolt.java
import java.util.HashMap;import java.util.Map;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.IRichBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Tuple;public class HashtagCounterBolt implements IRichBolt { MapcounterMap; private OutputCollector collector; @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.counterMap = new HashMap (); this.collector = collector; } @Override public void execute(Tuple tuple) { String key = tuple.getString(0); if(!counterMap.containsKey(key)){ counterMap.put(key, 1); }else{ Integer c = counterMap.get(key) + 1; counterMap.put(key, c); } collector.ack(tuple); } @Override public void cleanup() { for(Map.Entry entry:counterMap.entrySet()){ System.out.println("Result: " + entry.getKey()+" : " + entry.getValue()); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("hashtag")); } @Override public Map getComponentConfiguration() { return null; }}
提交拓扑
提交拓扑是主要的应用. Twitter拓扑由 TwitterSampleSpout , HashtagReaderBolt 和 HashtagCounterBolt 组成.以下程序代码显示了如何提交拓扑.
编码:TwitterHashtagStorm.java
import java.util.*;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.topology.TopologyBuilder;public class TwitterHashtagStorm { public static void main(String[] args) throws Exception{ String consumerKey = args[0]; String consumerSecret = args[1]; String accessToken = args[2]; String accessTokenSecret = args[3]; String[] arguments = args.clone(); String[] keyWords = Arrays.copyOfRange(arguments, 4, arguments.length); Config config = new Config(); config.setDebug(true); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("twitter-spout", new TwitterSampleSpout(consumerKey, consumerSecret, accessToken, accessTokenSecret, keyWords)); builder.setBolt("twitter-hashtag-reader-bolt", new HashtagReaderBolt()) .shuffleGrouping("twitter-spout"); builder.setBolt("twitter-hashtag-counter-bolt", new HashtagCounterBolt()) .fieldsGrouping("twitter-hashtag-reader-bolt", new Fields("hashtag")); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("TwitterHashtagStorm", config, builder.createTopology()); Thread.sleep(10000); cluster.shutdown(); }}
构建并运行应用程序
完整的应用程序有四个Java码.它们如下<
TwitterSampleSpout.java
HashtagReaderBolt.java
HashtagCounterBolt.java
TwitterHashtagStorm.java
您可以使用以下命令编译应用程序以下命令 :
javac -cp"/path/to/storm/apache-storm-0.9.5/lib/*":"/path/to/twitter4j/lib/*"* .java
使用以下命令执行应用程序 :
javac -cp "/path/to/storm/apache-storm-0.9.5/lib/*":"/path/to/twitter4j/lib/*":.TwitterHashtagStorm…
输出
应用程序将打印当前可用的#标签及其计数.输出应类似于以下 :
Result: jazztastic : 1Result: foodie : 1Result: Redskins : 1Result: Recipe : 1Result: cook : 1Result: android : 1Result: food : 2Result: NoToxicHorseMeat : 1Result: Purrs4Peace : 1Result: livemusic : 1Result: VIPremium : 1Result: Frome : 1Result: SundayRoast : 1Result: Millennials : 1Result: HealthWithKier : 1Result: LPs30DaysofGratitude : 1Result: cooking : 1Result: gameinsight : 1Result: Countryfile : 1Result: androidgames : 1