A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

 找回密码
 加入黑马

QQ登录

只需一步,快速开始

2.创建两个线程池和一个Storage。一个抓取网页线程池,负责执行request请求,并返回网页内容,存到Storage中。另一个是解析网页线程池,负责从Storage中取出网页内容并解析,解析用户资料存入数据库,解析该用户关注的人的首页,将该地址请求又加入抓取网页线程池。一直循环下去。
3.关于url去重,我是直接将访问过的链接md5化后存入数据库,每次访问前,查看数据库中是否存在该链接。
  1. public class CrawlZhiHu {
  2.     private static Logger logger = MyLogger.getMyLogger(CrawlZhiHu.class);
  3.     private Storage storage = null;
  4.     public CrawlZhiHu(){
  5.         storage = new Storage();
  6.     }
  7.     public static void main(String[] args) throws Exception{
  8.         ZhihuHttpClient zhClient = new ZhihuHttpClient();
  9.         CrawlZhiHu crawlZhiHu= new CrawlZhiHu();
  10.         crawlZhiHu.getZhiHu(zhClient, "https://www.zhihu.com/people/wo-yan-chen-mo/followees");
  11.     }
  12. public void getZhiHu(ZhihuHttpClient zhClient, String startUrl){
  13.         System.out.print("请输入要抓取的用户数量:");
  14.         int crawlUserCount = new Scanner(System.in).nextInt();
  15.         ThreadPoolMonitor et1,et2;//监测线程池执行情况
  16. ThreadPoolExecutor getWebPagethreadPool = new ThreadPoolExecutor(5, 10, 3, TimeUnit.SECONDS,
  17.                 new ArrayBlockingQueue<Runnable>(1000), new ThreadPoolExecutor.DiscardOldestPolicy());
  18. MyThreadPoolExecutor parseWebPagethreadPool = new MyThreadPoolExecutor(1, 1, 3, TimeUnit.SECONDS,
  19.                 new ArrayBlockingQueue<Runnable>(1000), new ThreadPoolExecutor.DiscardOldestPolicy(),storage);
  20.         HttpGet getRequest = new HttpGet(startUrl);
  21.         getWebPagethreadPool.execute(new GetWebPageTask(zhClient,getRequest,storage,getWebPagethreadPool,parseWebPagethreadPool));
  22.         et1 = new ThreadPoolMonitor(parseWebPagethreadPool,"解析网页线程池--");
  23.         et2 = new ThreadPoolMonitor(getWebPagethreadPool,"获取网页线程池--");
  24.         new Thread(et1).start();
  25.         new Thread(et2).start();
  26.         while(true){
  27.             if(ParseWebPageTask.userCount >= crawlUserCount){
  28.                 getWebPagethreadPool.shutdown();
  29.                 if(getWebPagethreadPool.isTerminated()  && storage.getQueue().size() == 0){
  30. parseWebPagethreadPool.shutdown();
  31.                     et1.shutdown();
  32.                     et2.shutdown();
  33.                     break;
  34.                 }
  35.             }
  36.             try {
  37.                 Thread.sleep(1000);
  38.             } catch (InterruptedException e) {
  39.                 e.printStackTrace();
  40.                 logger.error("InterruptedException",e);
  41.             }
  42.         }
  43.     }
  44. }
复制代码


  1. public class GetWebPageTask implements Runnable{
  2.         private static Logger logger = MyLogger.getMyLogger(GetWebPageTask.class);
  3.         private HttpGet getMethod = null;
  4.         public static int gwpCount = 0;
  5.         Storage storage = null;
  6.         ZhihuHttpClient zhClient = null;
  7.         ThreadPoolExecutor gwpThreadPool = null;//获取网页线程池
  8.         MyThreadPoolExecutor pwpThreadPool = null;//解析网页线程池
  9.         public GetWebPageTask(){
  10.         }
  11.         public GetWebPageTask(ZhihuHttpClient zhClient, HttpGet getMethod, Storage storage,ThreadPoolExecutor gwpThreadPool,MyThreadPoolExecutor pwpThreadPool){
  12.                 // TODO Auto-generated constructor stub
  13.                 this.zhClient = zhClient;
  14.                 this.getMethod = getMethod;
  15.                 this.storage = storage;
  16.                 this.gwpThreadPool = gwpThreadPool;
  17.                 this.pwpThreadPool = pwpThreadPool;
  18.         }
  19.         public void run(){
  20.                 CloseableHttpResponse response = null;
  21.                 CloseableHttpClient hc = zhClient.getHttpClient();
  22.                 try {
  23. response = hc.execute(getMethod,zhClient.getContext());
  24.                         int status = response.getStatusLine().getStatusCode();
  25.                         logger.error("executing request " + getMethod.getURI() + "   status:" + status);
  26.                         while(status == 429){
  27. Thread.sleep(100);
  28.                                 response = hc.execute(getMethod,zhClient.getContext());
  29.                                 status = response.getStatusLine().getStatusCode();
  30.                                 if(status != 429){
  31.                                         break;
  32.                                 }
  33.                         }
  34.                         if(status == HttpStatus.SC_OK){
  35.                                 gwpCount++;
  36.                                 String s = IOUtils.toString(response.getEntity().getContent());
  37.                                 storage.push(s);//入队
  38.                                 pwpThreadPool.execute(new ParseWebPageTask(zhClient,this.storage,gwpThreadPool,pwpThreadPool));
  39.                         } else if(status == 502 || status == 504 || status == 500){
  40.                                 return ;
  41.                         }
  42.                 } catch (ClientProtocolException e) {
  43.                         e.printStackTrace();
  44.                         logger.error("ClientProtocolException",e);
  45.                 } catch (ConnectException e) {
  46.                         e.printStackTrace();
  47.                         logger.error("ConnectException",e);
  48.                 } catch (IOException e) {
  49.                         e.printStackTrace();
  50.                         logger.error("IOException",e);
  51.                 } catch (InterruptedException e) {
  52.                         e.printStackTrace();
  53.                         logger.error("InterruptedException",e);
  54.                 } catch (NullPointerException e){
  55.                         e.printStackTrace();
  56.                         logger.error("NullPointerException",e);
  57.                 }finally {
  58. if(response.getEntity() != null){
  59.                                 try {
  60.                                         getMethod.releaseConnection();
  61.                                         response.getEntity().getContent().close();
  62.                                 } catch (UnsupportedOperationException e) {
  63. e.printStackTrace();
  64.                                         logger.error("UnsupportedOperationException",e);
  65.                                 } catch (IOException e) {
  66. e.printStackTrace();
  67.                                         logger.error("IOException",e);
  68.                                 }
  69.                         }
  70.                         if (response.getStatusLine().getStatusCode() != 200) {
  71.                                 getMethod.abort();
  72.                         }
  73.                 }

  74.         }

  75. }
复制代码
解析网页线程,负责解析网页并将解析出的用户资料插入数据库

  1. public class ParseWebPageTask implements Runnable{
  2.         private static Logger logger = MyLogger.getMyLogger(MyThreadPoolExecutor.class);
  3.         public static int pwpCount = 0;
  4.         public static int userCount = 0;
  5.         Storage storage = null;
  6.         ZhihuHttpClient zhClient = null;
  7.         ThreadPoolExecutor gwpThreadPool = null;
  8.         MyThreadPoolExecutor pwpThreadPool = null;
  9.         public ParseWebPageTask(){
  10.         }
  11.         public ParseWebPageTask(ZhihuHttpClient zhClient,Storage storage,ThreadPoolExecutor gwpThreadPool,MyThreadPoolExecutor pwpThreadPool){
  12.                 this.storage = storage;
  13.                 this.zhClient = zhClient;
  14.                 this.gwpThreadPool = gwpThreadPool;
  15.                 this.pwpThreadPool = pwpThreadPool;
  16.         }
  17.         @Override
  18.         public void run() {
  19.                 // TODO Auto-generated method stub
  20.                 try {
  21.                         pwpCount++;
  22.                         User u = null;
  23.                         String ym = storage.pop();
  24.                         Document doc = Jsoup.parse(ym);
  25.                         Connection cn = ConnectionManage.getConnection();
  26.                         if(doc.select("title").size() != 0){
  27. u = parseUserdetail(doc);
  28.                                 if(ZhuhuDAO.insetToDB(cn,u)){
  29. //                                        storage.getResult().getUserVector().add(u);
  30.                                 }
  31.                                 for(int i = 0;i < u.getFollowees()/20 + 1;i++){
  32. String url = "https://www.zhihu.com/node/ProfileFolloweesListV2?params={%22offset%22:" + 20*i + ",%22order_by%22:%22created%22,%22hash_id%22:%22" + u.getHashId() +"%22}";
  33.                                         url = url.replaceAll("[{]","%7B").replaceAll("[}]","%7D").replaceAll(" ","%20");
  34.                                         if(gwpThreadPool.getQueue().size() <= 100){
  35. dealHref(cn,url);
  36.                                         }
  37.                                 }
  38.                         }else {
  39. Elements es = doc.select(".zm-list-content-medium .zm-list-content-title a");
  40.                                 for(Element temp:es){
  41.                                         String userIndex = temp.attr("href") + "/followees";
  42.                                         dealHref(cn,userIndex);
  43.                                 }
  44.                         }
  45.                         if(!cn.isClosed()){
  46.                                 cn.close();
  47.                                 cn = null;
  48.                         }
  49.                 } catch (Exception e) {
  50.                         // TODO Auto-generated catch block
  51.                         e.printStackTrace();
  52.                         logger.error("Exception",e);
  53.                 }
  54.         }
  55. public void dealHref(Connection cn,String href) throws SQLException {
  56.                 String md5Href = Md5Util.Convert2Md5(href);
  57. //                if(storage.getResult().getHrefSet().add(href)){
  58. //                        if(storage.getResult().getHrefSet().size() >= 10000){
  59. //                                storage.getResult().getHrefSet().clear();
  60. //                        }
  61.                 if(ZhuhuDAO.insertHref(cn,md5Href) || gwpThreadPool.getQueue().size() <= 50){
  62. if(pwpThreadPool.getQueue().size() <= 100){
  63. HttpGet getRequest = null;
  64.                                 try{
  65.                                         getRequest = new HttpGet(href);
  66. gwpThreadPool.execute(new GetWebPageTask(zhClient,getRequest,storage,gwpThreadPool,pwpThreadPool));
  67.                                 } catch(IllegalArgumentException e){
  68.                                         e.printStackTrace();
  69.                                         logger.error("IllegalArgumentException",e);
  70.                                 }
  71.                         }
  72.                 } else{
  73. }
  74.         }
  75. public User parseUserdetail(Document doc){
  76.                 User u = new User();
  77.                 u.setLocation(getUserinfo(doc,"location"));
  78.                 u.setBusiness(getUserinfo(doc,"business"));
  79.                 u.setEmployment(getUserinfo(doc,"employment"));
  80.                 u.setPosition(getUserinfo(doc,"position"));
  81.                 u.setEducation(getUserinfo(doc,"education"));
  82.                 try {
  83.                         u.setUsername(doc.select(".title-section.ellipsis a").first().text());
  84.                         u.setUrl("https://www.zhihu.com" + doc.select(".title-section.ellipsis a").first().attr("href"));
  85.                 } catch (NullPointerException e){
  86.                         logger.error("NullPointerException",e);
  87.                         e.printStackTrace();
  88.                 }
  89.                 u.setAgrees(Integer.valueOf(doc.select(".zm-profile-header-user-agree strong").first().text()));
  90.                 u.setThanks(Integer.valueOf(doc.select(".zm-profile-header-user-thanks strong").first().text()));
  91.                 u.setFollowees(Integer.valueOf(doc.select(".zm-profile-side-following strong").first().text()));
  92.                 u.setFollowers(Integer.valueOf(doc.select(".zm-profile-side-following strong").get(1).text()));
  93.                 try {
  94.                         u.setHashId(doc.select(".zm-profile-header-op-btns.clearfix button").first().attr("data-id"));
  95.                 }catch (NullPointerException e){
  96.                         e.printStackTrace();
  97. u.setHashId("843df56056dc14b8dd36ace99be09337");
  98.                 }
  99.                 return u;
  100.         }
  101. public String getUserinfo(Document doc,String infoName){
  102.                 Element e = doc.select(".zm-profile-header-user-describe ." + infoName + ".item").first();
  103.                 if(e == null){
  104.                         return "";
  105.                 } else{
  106.                         return e.attr("title");
  107.                 }
  108.         }
  109. }
复制代码



0 个回复

您需要登录后才可以回帖 登录 | 加入黑马