|
马上注册,结交更多好友,享用更多功能^_^
您需要 登录 才可以下载或查看,没有账号?立即注册
x
本帖最后由 神奇的yxq 于 2017-10-20 12:21 编辑
学完小甲鱼的爬虫教程,迫不及待的写了几个爬虫~
爬虫功能:爬取电影资源
py版本:3.6.2
放源码:
1、主爬虫代码:
- #爬虫名称:movies爬
- #爬虫功能:爬取电影资源,生成一个html文本
- #爬虫版本:1.0
- #作者:yangxq
- #时间:2017.09.10
- import sys
- sys.path.append('E:\LearnCode\python\MyMod')
- import re
- import random
- import gethtml
- import time
- import savehtml
- #主函数
- def main():
- target = 'http://www.mp4ba.net/'
- #需要爬取的页数
- pages = 10
-
- #逐页爬取电影
- for i in range(1,pages + 1):
-
- #拼接每页的地址
- pageurl = target + 'forum-mp4ba-' + str(i) +'.html'
-
- #获取第i页的电影列表
- code,html = gethtml.gethtml(pageurl)
- html = html.decode(code)
-
- print('\n正在访问第 %d/10页 : %s\n' % (i,pageurl) + '-' * 50)
- movies_name,movies_url = getmovies(html)
- num = len(movies_name)
- info = []
- #获取每个电影的磁力链接
- for j in range(num):
- print('\n第%d页 %d/%d | 正在爬取 %s 的详情页...' % (i,j + 1,num,movies_name[j]))
- code,html = gethtml.gethtml(movies_url[j])
- html = html.decode(code)
- movies_summary,magnet= getmagnet(html)
- print('\n%s 的资源已经爬取!正在赶往下一个网页...' % movies_name[j] + '\n' * 2 + '-' * 30 )
- temp = []
- #电影信息打包处理
- temp.append(movies_summary)
- for each in magnet:
- temp.append(magnet)
- info.append(temp)
- #按页保存到html文件
- savehtml.savehtml(movies_name,movies_url,info)
- #获取页面电影链接
- def getmovies(html):
- movies_name = []
- movies_url = []
- start = 10
- #循环查找电影名称和链接
- while True:
-
- #页面上电影名称开始特征'xst' 结束特征'<'
- start = html.find('xst',start)
- #当查找不到xst时退出循环
- if start == -1:
- break
- end = html.find('<',start + 5)
- print('\n已找到 : %s' % html[start + 5:end])
- movies_name.append(html[start + 5:end])
- #电影详情网址链接在'xst'之前第一个'href'之后 结束特征'"'
- begin = html.find('href=',start - 100)
- stop = html.find('"',begin + 6)
- movies_url.append(html[begin + 6:stop])
- #进入下一个查找段
- start += 100
-
- #返回主页上的电影的网址列表
- num = len(movies_name)
- print('\n本页查找完毕! 共发现电影 %s 部\n' %num + '-' * 50)
- return (movies_name,movies_url)
- #获取磁力链接
- def getmagnet(html):
- magnet = []
- #查找简介 特征'简介' 结束特征'<'
- start = html.find('◎简 介')
- end = html.find('<div',start + 10)
- movies_summary = html[start + 12:end - 1]
- #删除错误截取的html代码
- div = movies_summary.find('div')
- movies_summary = movies_summary[0:start - 2]
- print('\n 简介:\n%s' % movies_summary)
-
- #循环查找磁链
- end = 0
- while True:
- #磁链开始特征'magnet:?' 结束标志'<'
- start = html.find('magnet:?',end)
- #当查找不到'magnet:?'时退出循环
- if start == -1:
- if len(magnet) == 0:
- magnet.append('http://www.mp4ba.net/')
- break
- end = html.find('>',start)
- print('\n 磁力:%s' % html[start:end])
- magnet.append(html[start:end])
- #进入下一个查找段
- #返回磁力链接列表
- return (movies_summary,magnet)
-
- if __name__ == '__main__':
- print('\n爬虫开始工作...')
- main()
复制代码
因为需要写入到html文件,所有单独写了一个功能为保存为html的单独代码
2、保存功能的代码:
- import time
- def savehtml(movies_name,movies_url,info):
-
- htmlinit = '''<!doctype html><html lang="zh-cn" ><head><meta charset="GBK"><title>MoviesList</title></head><body><h3 align="center" id="1">欢迎使用yangxq的电影爬虫</h3><h6 align="center"><stong>Yangxq</stong></h6><hr><hr>'''
- localtime = time.localtime()
-
- htmlname = 'MoviesList' + '_' + \
- str(localtime[0]) + '_' + \
- str(localtime[1]) + '_' + \
- str(localtime[2]) + '.html'
-
- with open(htmlname,'a+') as f:
- #将文件指针指向第一个字符
- f.seek(0,0)
- html = f.read()
- #第一次写入初始化
- if '<!doctype' not in html:
- f.seek(0,0)
- f.write(htmlinit)
-
- #不是第一次写入则删除尾部结束代码
- if '</p></body></html>' in html:
- tel = html.find('</p></body></html>')
- f.seek(tel,0)
-
- #按格式写入文件
- num = len(movies_name)
- for i in range(num):
- if movies_name[i] in html:
- continue
-
- #第一行显示电影名称
- lines1 = '''<p><a target="_blank" href="'''+ movies_url[i] + '''"><big><stong>''' + movies_name[i] + '</stong></big></a><br>' + '-' * 30 + '<br>'
- #第二行显示电影简介
- lines2 = '<i><small>' + info[i][0] + '</small></i><br>' + '-' * 30 + '<br>'
- #第三行显示下载地址
- magnetnum = len(info[i][1])
- lines3 = ''
- for j in range(magnetnum):
- lines3 = lines3 + '''<a href="'''+ info[i][1][j] + '''">下载地址''' + str(j + 1) + '''</a> '''
- lines = lines1 + lines2 + lines3 + '<hr>'
- f.write(lines)
- #结束代码
- lines4 = '</p></body></html>'
- f.write(lines4)
- print('\n保存完毕!')
- if __name__ == '__main__':
- movies_name=['电影一','电影二']
- info = [['我是电影一的简介',['我是电影一的第一个磁力','我是电影一的第二个磁力']],['我是电影二的简介',['我是电影二的第一个磁力','我是电影二的第二个磁力']]]
- savehtml(movies_name,info)
复制代码
为了提高通用性,把访问代码单独出来了。
3、提供访问功能的代码
- import random
- import urllib.request
- import time
- import os
- def gethtml(url,data = None):
- #建立地址表和协议表,获取代理ip
- ipadds = []
- iptype = []
- #加上协议
- if 'http://' not in url:
- url = 'http://' + url
-
- #从文件获取ip
- timename = time.localtime()
- ipname = 'iplist_' + str(timename[0]) + '_' +\
- str(timename[1]) + '_' +\
- str(timename[2]) + '.txt'
- #不是当天最新的iplist则运行ipget更新
- if ipname not in os.listdir():
- try:
- import ipget
- ipget.main()
- with open(ipname,'r') as ip:
- iplist = ip.read().split('\n')
- for each in iplist:
- iptype.append(each[0:5])
- ipadds.append(each[8:])
- except ModuleNotFoundError:
- ipadds = ['125.93.148.3:9000','123.7.38.31:9999','220.249.185.178:9999']
- iptype = ['HTTP','HTTP','HTTP']
- else:
- with open(ipname,'r') as ip:
- iplist = ip.read().split('\n')
- for each in iplist:
- iptype.append(each[0:5])
- ipadds.append(each[8:])
-
- #代理和伪装
- r = len(ipadds)
- i = int(random.uniform(0,r))
- if __name__ == '__main__':
- print('\n本次访问使用 %s : %s 代理...' % (iptype[i],ipadds[i]))
-
- proxy_support = urllib.request.ProxyHandler({iptype[i]:ipadds[i]})
- opener = urllib.request.build_opener(proxy_support)
- urllib.request.install_opener(opener)
- req = urllib.request.Request(url,data)
- req.add_header('User-Agent','Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.87 Safari/537.36')
- #req.add_header('Host','www.mmjpg.com')
- #req.add_header('Accept','text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8')
- #req.add_header('Accept-Encoding','gzip, deflate, sdch')
- #req.add_header('Accept-Language','zh-CN,zh;q=0.8')
- #req.add_header('Cache-Control','max-age=0')
- #req.add_header('Referer','http://www.mmjpg.com/' + str(i))
- #访问
- response = urllib.request.urlopen(req)
- html = response.read()
- #不要太过分,休息1~5秒
- code = codetest(html)
- time.sleep(int(random.uniform(1,6)))
- return (code,html)
- #网页编码测试
- def codetest(html):
- try:
- html = html.decode('UTF-8')
- return 'UTF-8'
- except UnicodeDecodeError:
- try:
- html = html.decode('GBK')
- return 'GBK'
- except UnicodeDecodeError:
- try:
- html = html.decode('GB18030')
- return 'GB18030'
- except UnicodeDecodeError:
- return 'unknow'
-
- if __name__ == '__main__':
- print('请输入测试网址: ',end = '')
- url = input()
- code,html = gethtml(url)
-
- print('\n该网页编码是: %s' % code)
- if code != 'unknow':
- with open(url + '.html','w') as f:
- html = html.decode(code)
- f.write(html)
- print('\n文件写入完毕!')
复制代码
防被封,每天第一次运行上面的gethtml时调用这个获取最新的ip
4、ip爬虫
- #爬虫名称:ipget
- #爬虫功能:从ip代理网站爬取代理ip并时间保存
- #爬虫作者:yangxq
- #时间:2017.09.09
- import urllib.request
- import random
- import time
- import re
- #主函数
- def main():
- url = 'http://www.xicidaili.com/'
- code,html = gethtml(url)
- html = html.decode(code)
- iplist = ipfind(html)
- ipsave(iplist)
- #查找ip
- def ipfind(html):
- iplist = {}
- ip = []
-
- #正则匹配ip,端口和协议
- ipadds = re.findall(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}',html)
-
- ipport = re.findall(r'<td>\d{1,5}</td>',html)
- ipport = re.findall(r'\d{1,5}',str(ipport))
-
- iptype = re.findall(r'<td>HTTPS</td>|<td>HTTP</td>',html)
- iptype = re.findall(r'HTTPS|HTTP',str(iptype))
- #以协议数量为标准得到可用ip数量,去除ss协议的ip
- ipnum = len(iptype)
-
- #拼接ip和端口,并导入字典,ip为键,协议为值
- for i in range(ipnum):
- ipadd = ipadds[i] + ':' + ipport[i]
- ip.append(ipadd)
- iplist[ip[i]] = iptype[i]
-
- ipnum = len(iplist)
- if __name__ == '__main__':
- print('\n已去掉重复ip地址,最终获得ip地址 %d 个' % ipnum)
-
- return iplist
- def gethtml(url,data = None):
-
- ipadds = ['125.93.148.3:9000','123.7.38.31:9999','220.249.185.178:9999']
- iptype = ['HTTP','HTTP','HTTP']
- #代理和伪装
- r = len(ipadds)
- i = int(random.uniform(0,r))
- if __name__ == '__main__':
- print('\n本次访问使用 %s : %s 代理...' % (iptype[i],ipadds[i]))
-
- proxy_support = urllib.request.ProxyHandler({iptype[i]:ipadds[i]})
- opener = urllib.request.build_opener(proxy_support)
- urllib.request.install_opener(opener)
- req = urllib.request.Request(url,data)
- req.add_header('User-Agent','Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.87 Safari/537.36')
- #req.add_header('Host','www.mmjpg.com')
- #req.add_header('Accept','text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8')
- #req.add_header('Accept-Encoding','gzip, deflate, sdch')
- #req.add_header('Accept-Language','zh-CN,zh;q=0.8')
- #req.add_header('Cache-Control','max-age=0')
- #req.add_header('Referer','http://www.mmjpg.com/' + str(i))
- #访问
- response = urllib.request.urlopen(req)
- html = response.read()
- #不要太过分,休息1~5秒
- code = codetest(html)
- time.sleep(int(random.uniform(1,6)))
- return (code,html)
- def codetest(html):
- try:
- html = html.decode('UTF-8')
- return 'UTF-8'
- except UnicodeDecodeError:
- try:
- html = html.decode('GBK')
- return 'GBK'
- except UnicodeDecodeError:
- try:
- html = html.decode('GB18030')
- return 'GB18030'
- except UnicodeDecodeError:
- return 'unknow'
-
- #储存
- def ipsave(iplist):
- timename = time.localtime()
-
- ipname = 'iplist_' + str(timename[0]) + '_' +\
- str(timename[1]) + '_' +\
- str(timename[2]) + '.txt'
- #保存
- with open(ipname,'w') as f:
- for each in iplist:
- f.writelines(iplist[each] + ' : ' + each)
- f.write('\n')
-
- if __name__ == '__main__':
- print('\n所有ip保存完毕!')
- if __name__ == '__main__':
- main()
复制代码
新加一个模块,自己写的,以后写爬虫就获取网页这一步就可以省很多事.
internet 模块:
- import urllib.request, urllib.parse, urllib.error
- import http.cookiejar
- import os, os.path
- import threading
- import random
- import queue
- import chardet
- import time
- import mylogging
- def urlParse(url):
- if "www" not in url:
- url = "www." + url
- if "http" not in url:
- url = "http://" + url
- return url
- class Browse:
- def __init__(self, name="浏览器"):
- self.name = name
- self.cookie = False
- # self.proxy = False
- # self.history = [] # 历史记录
- self.header = {
- "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
- "Accept-Encoding": "utf-8",
- "Accept-Language": "zh-CN,zh;q=0.8",
- "User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.87 Safari/537.36",
- "Connection": "keep-alive"}
- # log日记初始化
- self._log_ = mylogging.Logging(logPath="Download", name="浏览器")
- def get(self, url):
- """
- get方法的函数
- :param url: 获取网页的地址
- :return:
- """
- url = urlParse(url)
- req = urllib.request.Request(url, headers=self.header)
- reTest = 0
- loop = True
- while loop:
- if reTest < 3:
- try:
- response = urllib.request.urlopen(req)
- except urllib.error.HTTPError as e:
- code = reason = "none"
- if hasattr(e, "code"):
- code = e.code
- if hasattr(e, "reason"):
- reason = e.reason
- self._log_.put((self.name, "HTTPError %s %s %s" % (url, code, reason)))
- return "HTTPError"
- except urllib.error.URLError as e:
- code = reason = "none"
- if hasattr(e, "code"):
- code = e.code
- if hasattr(e, "reason"):
- reason = e.reason
- self._log_.put((self.name, "URLError %s %s %s" % (url, code, reason)))
- return "URLError"
- except Exception:
- reTest += 1
- time.sleep(1)
- continue
- else:
- content = response.read()
- encoding = chardet.detect(content)["encoding"]
- html = content.decode(encoding)
- self._log_.put((self.name, "Get Html Success! %s" % url))
- return html
- else:
- self._log_.put((self.name, "OtherError %s %s %s" % url))
- return "OtherError"
- def post(self, url, data):
- """
- post方法的函数
- :param url: 网页地址
- :param data: 表单字典
- :return:
- """
- if self.cookie:
- cjar = http.cookiejar.CookieJar()
- opener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(cjar))
- urllib.request.install_opener(opener)
- # url = urlParse(url)
- postdata = urllib.parse.urlencode(data).encode("utf-8")
- req = urllib.request.Request(url, data=postdata, headers=self.header)
- reTest = 0
- loop = True
- while loop:
- if reTest < 3:
- try:
- response = urllib.request.urlopen(req)
- except urllib.error.HTTPError as e:
- code = reason = "none"
- if hasattr(e, "code"):
- code = e.code
- if hasattr(e, "reason"):
- reason = e.reason
- self._log_.put((self.name, "HTTPError %s %s %s" % (url, code, reason)))
- return "HTTPError"
- except urllib.error.URLError as e:
- code = reason = "none"
- if hasattr(e, "code"):
- code = e.code
- if hasattr(e, "reason"):
- reason = e.reason
- self._log_.put((self.name, "URLError %s %s %s" % (url, code, reason)))
- return "URLError"
- except Exception:
- reTest += 1
- time.sleep(1)
- continue
- else:
- content = response.read()
- encoding = chardet.detect(content)["encoding"]
- html = content.decode(encoding)
- self._log_.put((self.name, "Post Html Success! %s" % url))
- return html
- else:
- self._log_.put((self.name, "OtherError %s %s %s" % url))
- return "OtherError"
- class Download(threading.Thread):
- def __init__(self, srcQueue=queue.Queue(), name="下载器"):
- threading.Thread.__init__(self)
- self.name = name # 进程名称
- self.srcQueue = srcQueue # 下载队列
- self.sleeptime = 0 # 下载等待时间
- self.Daemon = True # 设置守护线程
- self.downloadPath = "Download" # 批量下载路径
- self.srcName = True # 下载命名方式 自动还是手动?
- self.srcType = "jpg" # 下载文件的格式
- self.downloadShow = True # 是否显示下载进度条
- # log立即初始化
- self._log_ = mylogging.Logging(logPath="Download", name="下载器")
- def __reporthook__(self, downloaded, perDataSize, allData):
- """
- 用于显示下载进度条
- :param downloaded:已经下载的数据包
- :param perDataSize:每个数据包的大小
- :param allData:文件的总大小
- :return:
- """
- if self.downloadShow:
- downloadedSize = downloaded * perDataSize
- percent = downloadedSize / allData
- if percent > 1:
- percent = 1
- donePart = int(percent * 50)
- undonePart = 50 - donePart
- line = ">" * donePart + "_" * undonePart
- downloadedSize = downloadedSize / 1024 / 1024 #下载的大小
- alldataSize = allData / 1024 / 1024 #总大小
- print("下载进度[%s%3.2d%%] %.2fMB/%.2fMB" % (line, percent * 100, downloadedSize, alldataSize))
- else:
- pass
- def __getName__(self, count=0):
- """
- 获取下载名称
- :param count:
- :return:
- """
- dirList = os.listdir(self.downloadPath)
- while True:
- if self.srcName: # 下载命名选项
- name = str(count) + "." + self.srcType
- count += 1
- if not os.path.exists(self.downloadPath + "\" + name):
- break
- else:
- name = input("请重命名资源下载名称:")
- break
- return name
- def run(self):
- """
- 线程主程序
- :return:
- """
- if not os.path.exists(self.downloadPath): # 创建下载文件夹
- os.mkdir(self.downloadPath)
- count = 1
- srcUrl = self.srcQueue.get()
- name = self.__getName__(count)
- reTest = 0
- loop = True
- while loop:
- if reTest < 3:
- try:
- path = self.downloadPath + "\" + name # 拼接存放路径及名称
- print("正在下载:%s [from %s...%s]" % (name, srcUrl[:20], srcUrl[-20:]))
- urllib.request.urlretrieve(srcUrl, path, self.__reporthook__)
- print("下载完成:%s [from %s...%s]" % (name, srcUrl[:20], srcUrl[-20:]))
- except urllib.error.HTTPError as e:
- code = reason = "none"
- if hasattr(e, "code"):
- code = e.code
- if hasattr(e, "reason"):
- reason = e.reason
- self._log_.put((self.name, "HTTPError %s %s %s" % (srcUrl, code, reason)))
- except urllib.error.URLError as e:
- code = reason = "none"
- if hasattr(e, "code"):
- code = e.code
- if hasattr(e, "reason"):
- reason = e.reason
- self._log_.put((self.name, "URLError %s %s %s" % (srcUrl, code, reason)))
- except Exception:
- reTest += 1
- time.sleep(1)
- continue
- else:
- self._log_.put((self.name, "%s Download Success! %s" % (path, srcUrl)))
- else:
- self._log_.put((self.name, "otherError %s" % srcUrl))
- reTest = 0
- srcUrl = self.srcQueue.get()
- name = self.__getName__(count)
- time.sleep(random.choice(range(self.sleeptime)))
复制代码
tips:mylogging是我自己的错误立即模块.就不放上来了,其实就是一个put函数获取信息 再写入到相应的文本文件中 |
评分
-
查看全部评分
|