开发手册 欢迎您!
软件开发者资料库

Apache Storm in Twitter

Twitter中的Apache Storm - 从简介,核心概念,集群架构,工作流,分布式消息系统,安装,工作示例,Trident,Twitter中的Apache Storm,Yahoo!开始,简单易学地学习Apache Storm。财务,应用。

在本章中,我们将讨论Apache Storm的实时应用.我们将看看在Twitter上如何使用Storm.

Twitter

Twitter是一种在线社交网络服务,提供发送和接收用户推文的平台.注册用户可以阅读和发布推文,但未注册的用户只能阅读推文. Hashtag用于通过在相关关键字之前附加#来按关键字对推文进行分类.现在让我们实时查看每个主题最常用的主题标签.

Spout Creation

spout的目的是获取人们尽快提交的推文. Twitter提供"Twitter Streaming API",这是一个基于Web服务的工具,用于检索人们实时提交的推文.可以使用任何编程语言访问Twitter Streaming API.

twitter4j 是一个开源的非官方Java库,它提供了一个基于Java的模块,可以轻松访问Twitter流式API. twitter4j 提供了一个基于监听器的框架来访问推文.要访问Twitter Streaming API,我们需要登录Twitter开发者帐户,并且应该获得以下OAuth身份验证详细信息.

  • Customerkey

  • CustomerSecret

  • AccessToken

  • AccessTookenSecret

Storm在其入门套件中提供了一个twitter spout, TwitterSampleSpout,.我们将使用它来检索推文. spout需要OAuth身份验证详细信息和至少一个关键字. spout将根据关键字发出实时推文.完整的程序代码如下所示.

编码:TwitterSampleSpout.java

import java.util.Map;import java.util.concurrent.LinkedBlockingQueue;import twitter4j.FilterQuery;import twitter4j.StallWarning;import twitter4j.Status;import twitter4j.StatusDeletionNotice;import twitter4j.StatusListener;import twitter4j.TwitterStream;import twitter4j.TwitterStreamFactory;import twitter4j.auth.AccessToken;import twitter4j.conf.ConfigurationBuilder;import backtype.storm.Config;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichSpout;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import backtype.storm.utils.Utils;@SuppressWarnings("serial")public class TwitterSampleSpout extends BaseRichSpout {   SpoutOutputCollector _collector;   LinkedBlockingQueue queue = null;   TwitterStream _twitterStream;   String consumerKey;   String consumerSecret;   String accessToken;   String accessTokenSecret;   String[] keyWords;   public TwitterSampleSpout(String consumerKey, String consumerSecret,      String accessToken, String accessTokenSecret, String[] keyWords) {         this.consumerKey = consumerKey;         this.consumerSecret = consumerSecret;         this.accessToken = accessToken;         this.accessTokenSecret = accessTokenSecret;         this.keyWords = keyWords;   }   public TwitterSampleSpout() {      // TODO Auto-generated constructor stub   }   @Override   public void open(Map conf, TopologyContext context,      SpoutOutputCollector collector) {         queue = new LinkedBlockingQueue(1000);         _collector = collector;         StatusListener listener = new StatusListener() {            @Override            public void onStatus(Status status) {               queue.offer(status);            }            @Override            public void onDeletionNotice(StatusDeletionNotice sdn) {}            @Override            public void onTrackLimitationNotice(int i) {}            @Override            public void onScrubGeo(long l, long l1) {}            @Override            public void onException(Exception ex) {}            @Override            public void onStallWarning(StallWarning arg0) {               // TODO Auto-generated method stub            }         };         ConfigurationBuilder cb = new ConfigurationBuilder();         cb.setDebugEnabled(true)            .setOAuthConsumerKey(consumerKey)            .setOAuthConsumerSecret(consumerSecret)            .setOAuthAccessToken(accessToken)            .setOAuthAccessTokenSecret(accessTokenSecret);         _twitterStream = new TwitterStreamFactory(cb.build()).getInstance();         _twitterStream.addListener(listener);         if (keyWords.length == 0) {            _twitterStream.sample();         }else {            FilterQuery query = new FilterQuery().track(keyWords);            _twitterStream.filter(query);         }   }   @Override   public void nextTuple() {      Status ret = queue.poll();      if (ret == null) {         Utils.sleep(50);      } else {         _collector.emit(new Values(ret));      }   }   @Override   public void close() {      _twitterStream.shutdown();   }   @Override   public Map getComponentConfiguration() {      Config ret = new Config();      ret.setMaxTaskParallelism(1);      return ret;   }   @Override   public void ack(Object id) {}   @Override   public void fail(Object id) {}   @Override   public void declareOutputFields(OutputFieldsDeclarer declarer) {      declarer.declare(new Fields("tweet"));   }}

Hashtag Reader Bolt

spout发出的推文将被转发 HashtagReaderBolt ,它将处理推文并发出所有可用的主题标签. HashtagReaderBolt使用twitter4j提供的 getHashTagEntities 方法. getHashTagEntities读取推文并返回主题标签列表.完整的程序代码如下 :

编码:HashtagReaderBolt.java

import java.util.HashMap;import java.util.Map;import twitter4j.*;import twitter4j.conf.*;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.IRichBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Tuple;public class HashtagReaderBolt implements IRichBolt {   private OutputCollector collector;   @Override   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {      this.collector = collector;   }   @Override   public void execute(Tuple tuple) {      Status tweet = (Status) tuple.getValueByField("tweet");      for(HashtagEntity hashtage : tweet.getHashtagEntities()) {         System.out.println("Hashtag: " + hashtage.getText());         this.collector.emit(new Values(hashtage.getText()));      }   }   @Override   public void cleanup() {}   @Override   public void declareOutputFields(OutputFieldsDeclarer declarer) {      declarer.declare(new Fields("hashtag"));   }   @Override   public Map getComponentConfiguration() {      return null;   }}

标签计数器螺栓

发出的标签将转发 HashtagCounterBolt .这个bolt将处理所有的hashtags并使用Java Map对象将每个hashtag及其计数保存在内存中.完整的程序代码如下所示.

编码:HashtagCounterBolt.java

import java.util.HashMap;import java.util.Map;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.IRichBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Tuple;public class HashtagCounterBolt implements IRichBolt {   Map counterMap;   private OutputCollector collector;   @Override   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {      this.counterMap = new HashMap();      this.collector = collector;   }   @Override   public void execute(Tuple tuple) {      String key = tuple.getString(0);      if(!counterMap.containsKey(key)){         counterMap.put(key, 1);      }else{         Integer c = counterMap.get(key) + 1;         counterMap.put(key, c);      }      collector.ack(tuple);   }   @Override   public void cleanup() {      for(Map.Entry entry:counterMap.entrySet()){         System.out.println("Result: " + entry.getKey()+" : " + entry.getValue());      }   }   @Override   public void declareOutputFields(OutputFieldsDeclarer declarer) {      declarer.declare(new Fields("hashtag"));   }   @Override   public Map getComponentConfiguration() {      return null;   }}

提交拓扑

提交拓扑是主要的应用. Twitter拓扑由 TwitterSampleSpout HashtagReaderBolt HashtagCounterBolt 组成.以下程序代码显示了如何提交拓扑.

编码:TwitterHashtagStorm.java

import java.util.*;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.topology.TopologyBuilder;public class TwitterHashtagStorm {   public static void main(String[] args) throws Exception{      String consumerKey = args[0];      String consumerSecret = args[1];      String accessToken = args[2];      String accessTokenSecret = args[3];      String[] arguments = args.clone();      String[] keyWords = Arrays.copyOfRange(arguments, 4, arguments.length);      Config config = new Config();      config.setDebug(true);      TopologyBuilder builder = new TopologyBuilder();      builder.setSpout("twitter-spout", new TwitterSampleSpout(consumerKey,         consumerSecret, accessToken, accessTokenSecret, keyWords));      builder.setBolt("twitter-hashtag-reader-bolt", new HashtagReaderBolt())         .shuffleGrouping("twitter-spout");      builder.setBolt("twitter-hashtag-counter-bolt", new HashtagCounterBolt())         .fieldsGrouping("twitter-hashtag-reader-bolt", new Fields("hashtag"));      LocalCluster cluster = new LocalCluster();      cluster.submitTopology("TwitterHashtagStorm", config,         builder.createTopology());      Thread.sleep(10000);      cluster.shutdown();   }}

构建并运行应用程序

完整的应用程序有四个Java码.它们如下<

  • TwitterSampleSpout.java

  • HashtagReaderBolt.java

  • HashtagCounterBolt.java

  • TwitterHashtagStorm.java

您可以使用以下命令编译应用程序以下命令 :

javac -cp"/path/to/storm/apache-storm-0.9.5/lib/*":"/path/to/twitter4j/lib/*"* .java

使用以下命令执行应用程序 :

javac -cp "/path/to/storm/apache-storm-0.9.5/lib/*":"/path/to/twitter4j/lib/*":.TwitterHashtagStorm      … 

输出

应用程序将打印当前可用的#标签及其计数.输出应类似于以下 :

Result: jazztastic : 1Result: foodie : 1Result: Redskins : 1Result: Recipe : 1Result: cook : 1Result: android : 1Result: food : 2Result: NoToxicHorseMeat : 1Result: Purrs4Peace : 1Result: livemusic : 1Result: VIPremium : 1Result: Frome : 1Result: SundayRoast : 1Result: Millennials : 1Result: HealthWithKier : 1Result: LPs30DaysofGratitude : 1Result: cooking : 1Result: gameinsight : 1Result: Countryfile : 1Result: androidgames : 1
s.parentNode.insertBefore(hm, s); })(); : "Jzndc69N7BtnPgpT" })