|
一直觉得自己之前写的使用定时抓取构建IP代理池实在过于简陋,并且有一部分的代码写的并不合理,刚好最近又在学习多线程,就将之前的代码进行了重构,也方便对抓取代理ip有需求的人。之前自己写的那篇文章就不删除了,里面用到了MySQL以及循环调用ip的方法(一些东西也是值得了解的。取其精华,弃其糟粕吧),大家有兴趣的可以看一下(最主要的还是不舍得访问量,哈哈)。 注:由于xici代理网的ip代理并不是很稳定,所以自己实现的ip代理池并不可用(4000个IP最后通过一系列逻辑处理和过滤之后大概只剩30多个,并且这30个ip也极不稳定),但感觉实现ip代理池的原理就是这样,在往后估计也就是性能上的优化。虽然ip代理池并不可用,但是如果你想要学习多线程以及抓取ip做爬虫的话,这篇文章我觉得是可以给你一些思路上的启发。 怎么设计一个IP代理池其实设计一个IP代理池是非常容易的一件事情,我们来看一下其中的步骤: 1.首先肯定是从提供代理ip的网站上对ip进行抓取 2.对抓取下来的ip进行初步的过滤,比如我一开始就将IP类型不是HTTPS并且IP链接速度大于2秒的给过滤掉了 3.对于符合我们要求的IP,我们要对其进行质量检测,判断其是否可用,这一步就是检测IP的质量,也就是这一步刷掉了大量的IP 4.将符合要求的ip写进Redis数据库中,我是以List形式存储在Redis中 5.设定一个进行抓取的周期,来更新你的IP代理池(在将新的IP抓取下来并进行处理之后,我们将原数据库清空,并将新的IP写入其中)
第五个我是直接每次先清空数据库,然后在进行新IP的抓取以及过滤然后才写入数据库中,这样明显是不合理的,因为它会造成你的IP代理池有很长一段时间是空缺的,这也是我在写这篇博客的时候才想到的一个不合理的地方(太懒,代码也就不改了,大家知道问题就行)。 整体架构![]()
实现细节HttpResponse的阻塞在我们用代理IP进行网页抓取的时候,经常会发生长时间的阻塞然后程序报错(具体原因不祥,如果你真的有兴趣弄明白,建议看源码),这个时候我们只要设定好链接时间并进行恰当的try,catch就可以解决这个问题。 如下面代码: /** * setConnectTimeout:设置连接超时时间,单位毫秒. * setConnectionRequestTimeout:设置从connect Manager获取Connection 超时时间,单位毫秒. * 这个属性是新加的属性,因为目前版本是可以共享连接池的. * setSocketTimeout:请求获取数据的超时时间,单位毫秒.如果访问一个接口,多少时间内无法返回数据, * 就直接放弃此次调用。 */HttpHost proxy = new HttpHost(ip, Integer.parseInt(port));RequestConfig config = RequestConfig.custom().setProxy(proxy).setConnectTimeout(3000).setSocketTimeout(3000).build();HttpGet httpGet = new HttpGet(url);httpGet.setConfig(config);try { //客户端执行httpGet方法,返回响应 CloseableHttpResponse httpResponse = httpClient.execute(httpGet); //得到服务响应状态码 if (httpResponse.getStatusLine().getStatusCode() == 200) { entity = EntityUtils.toString(httpResponse.getEntity(), "utf-8"); } httpResponse.close(); httpClient.close(); } catch (ClientProtocolException e) { entity = null; } catch (IOException e) { entity = null; } return entity;}- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
线程类的实现多线程需要注意的就是一点:线程安全,还有就是保证线程同步而不发生脏读,这些东西需要结合整体代码逻辑去把握,我就在这里不细说了。考虑大家有可能产生疑惑,所以我在代码中进行了详细的注释,大家有兴趣的可以在我github上查看源码,有不懂的或有疑惑的同学欢迎在评论区提问与讨论。 我贴一下提供多线程抓取代理ip的服务类: public class IPPool { //成员变量(非线程安全) private List<IPMessage> ipMessages; public IPPool(List<IPMessage> ipMessages) { this.ipMessages = ipMessages; } public void getIP(List<String> urls) { String ipAddress; String ipPort; for (int i = 0; i < urls.size(); i++) { //随机挑选代理IP(仔细想了想,本步骤由于其他线程有可能在位置确定之后对ipMessages数量进行增加,虽说不会改变已经选择的ip代理的位置,但合情合理还是在对共享变量进行读写的时候要保证其原子性,否则极易发生脏读) //每个线程先将自己抓取下来的ip保存下来并进行过滤与检测 List<IPMessage> ipMessages1 = new ArrayList<>(); String url = urls.get(i); synchronized (ipMessages) { int rand = (int) (Math.random()*ipMessages.size()); out.println("当前线程 " + Thread.currentThread().getName() + " rand值: " + rand + " ipMessages 大小: " + ipMessages.size()); ipAddress = ipMessages.get(rand).getIPAddress(); ipPort = ipMessages.get(rand).getIPPort(); } //这里要注意Java中非基本类型的参数传递方式,实际上都是同一个对象 boolean status = URLFecter.urlParse(url, ipAddress, ipPort, ipMessages1); //如果ip代理池里面的ip不能用,则切换下一个IP对本页进行重新抓取 if (status == false) { i--; continue; } else { out.println("线程:" + Thread.currentThread().getName() + "已成功抓取 " + url + " ipMessage1:" + ipMessages1.size()); } //对ip重新进行过滤,只要速度在两秒以内的并且类型为HTTPS的 ipMessages1 = IPFilter.Filter(ipMessages1); //对ip进行质量检测,将质量不合格的ip在List里进行删除 IPUtils.IPIsable(ipMessages1); //将质量合格的ip合并到共享变量ipMessages中,进行合并的时候保证原子性 synchronized (ipMessages) { out.println("线程" + Thread.currentThread().getName() + "已进入合并区 " + "待合并大小 ipMessages1:" + ipMessages1.size()); ipMessages.addAll(ipMessages1); } } }}- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
将IPMessages(保存IP信息的对象)写入Redis中由于Redis只支持字符串与字节流数据,所以我们要想将一个对象存储到Redis的List中,则必须将对象序列化(转换成字节流),反之,如果我们想要将Redis中的数据拿出来,就要反序列化。关于序列化与反序列化的知识不懂的大家百度,我就不细说了。来看一下这部分的代码: 序列化以及反序列化(Java中好像有现成的方法,我没有具体了解): /** * Created by hg_yi on 17-8-9. * * java.io.ObjectOutputStream代表对象输出流,它的writeObject(Object obj)方法 * 可对参数指定的obj对象进行序列化,把得到的字节序列写到一个目标输出流中。 * * java.io.ObjectInputStream代表对象输入流,它的readObject()方法一个源输入流中读 * 取字节序列,再把它们反序列化为一个对象,并将其返回。 * * 对象序列化包括如下步骤: * 1)创建一个对象输出流,它可以包装一个其他类型的目标输出流,如文件输出流(我这里是字节流); * 2)通过对象输出流的writeObject()方法写对象。 * * 对象反序列化的步骤如下: * 1)创建一个对象输入流,它可以包装一个其他类型的源输入流,如文件输入流(我这里是字节流); * 2)通过对象输入流的readObject()方法读取对象。 */public class SerializeUtil { public static byte[] serialize(Object object) { ObjectOutputStream oos; ByteArrayOutputStream baos; try { // 序列化 baos = new ByteArrayOutputStream(); oos = new ObjectOutputStream(baos); oos.writeObject(object); byte[] bytes = baos.toByteArray(); return bytes; } catch (Exception e) { e.printStackTrace(); } return null; } //反序列化 public static Object unserialize(byte[] bytes) { ByteArrayInputStream bais; ObjectInputStream ois; try { // 反序列化 bais = new ByteArrayInputStream(bytes); ois = new ObjectInputStream(bais); return ois.readObject(); } catch (Exception e) { e.printStackTrace(); } return null; }}- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
最后,定时任务的实现我没有再用第一篇博客里面讲的quartz,代码太多并且不好理解。当时太年轻,还不知道Timer这个Java自带的产生定时任务的类,它的实现相当于是开了一个单独的线程去帮助执行我们所设定的任务,它的用法我就不细说了,源码里有,非常方便,并且代码量也很少。 好了,废话说了这么多,贴上大家想要的东西: 戳我获得源码哦~
|