当前位置: 首页 > news >正文

网站建设公司的市场开发方案微信小程序开发工具

网站建设公司的市场开发方案,微信小程序开发工具,xml天气预报网站怎么做,如何做拦截网站文章目录 1 第三方库2 爬取2.1 初始化函数2.2 结束时关闭数据库2.3 生成header2.4 获取请求body2.5 解析异步json数据2.6 使用BS4的find方法解析2.7 写入口函数2.8 调用 3 完整代码 1 第三方库 获取随机UA pip install fake-useragent连接数据库 $ pip3 install PyMySQL发起…

文章目录

  • 1 第三方库
  • 2 爬取
    • 2.1 初始化函数
    • 2.2 结束时关闭数据库
    • 2.3 生成header
    • 2.4 获取请求body
    • 2.5 解析异步json数据
    • 2.6 使用BS4的find方法解析
    • 2.7 写入口函数
    • 2.8 调用
  • 3 完整代码

1 第三方库

  • 获取随机UA
pip install fake-useragent
  • 连接数据库
$ pip3 install PyMySQL
  • 发起请求
pip install requests
  • 解析页面
pip install beautifulsoup4
  • 进度条
pip install tqdm

2 爬取

2.1 初始化函数

  • 新建爬虫类
class mySpider:
  • 创建数据库连接和初始化url
# 初始化urldef __init__(self, url):self.url = url# 计数,请求一个页面的次数,初始值为1self.count = 1# 数据库连接对象self.db = pymysql.connect(host='localhost',port=3306,user='root',password='123456',database='test')# 创建游标对象self.cursor = self.db.cursor()

2.2 结束时关闭数据库

  • 关闭数据库释放资源,方法运行完后调用。
# 结束断开数据库连接def __del__(self):self.cursor.close()self.db.close()print("关闭数据库!")

2.3 生成header

  • 使用第三方库fake-useragent生成随机UA
# 获取一个headerdef getHeader(self):# 实例化ua对象ua = UserAgent()# 随机获取一个uaheaders = {'User-Agent': ua.random}return headers

2.4 获取请求body

  • 注意有返回值的递归,要把返回值返回,回调时加return
def getBody(self, url, send_type, data):# 每次请求都随机停顿一些时间# time.sleep(random.randint(1, 2))# 在超时时间内,对于失败页面尝试请求三次if self.count <= 3:try:if send_type == 'get':res = requests.get(url=url, headers=self.getHeader(), params=data, timeout=2)elif send_type == 'post':res = requests.post(url=url, headers=self.getHeader(), data=data, timeout=2)else:print("未输入send_type,直接返回None")res = Nonereturn resexcept Exception as e:print(e)self.count += 1print(f"第{self.count}次,发起请求")# 再次调用自己,并把值返回,(注意要加return)return self.getBody(url, send_type, data)

2.5 解析异步json数据

  • 解析异步json数据
def parseData(self, dataList):# 循环查看详情for row in tqdm(dataList, desc='爬取进度'):# 请求详情页urlurlDetail = f"https://www.baidu.com/CTMDS/pub/PUB010100.do?method=handle04&compId={row['companyId']}"# 发起请求# 每次请求都初始化一次self.countself.count = 1res = self.getBody(url=urlDetail, send_type='get', data={})if res is not None:# 解析htmlself.parseHtml(row=row, htmlText=res.text)else:print(f"{urlDetail}请求失败!")

2.6 使用BS4的find方法解析

  • find_all() 方法用来搜索当前 tag 的所有子节点,并判断这些节点是否符合过滤条件,最后以列表形式将符合条件的内容返回,语法格式如下
    find_all( name , attrs , recursive , text , limit )
  • 参数说明
  • name:查找所有名字为 name 的 tag 标签,字符串对象会被自动忽略。
  • attrs:按照属性名和属性值搜索 tag 标签,注意由于 class 是 Python 的关键字吗,所以要使用 “class_”。
  • recursive:find_all() 会搜索 tag 的所有子孙节点,设置 recursive=False 可以只搜索 tag 的直接子节点。
  • text:用来搜文档中的字符串内容,该参数可以接受字符串 、正则表达式 、列表、True。
  • limit:由于 find_all() 会返回所有的搜索结果,这样会影响执行效率,通过 limit 参数可以限制返回结果的数量。
def parseHtml(self, row, htmlText):soup = BeautifulSoup(htmlText, 'html.parser')# 获取备案信息divList = soup.find_all('div', class_=['col-md-8'])divtextList = [re.sub(r'\s+', '', div.text) for div in divList]# 获取其他机构地址divListOther = soup.find_all('div', class_=['col-sm-8'])divtextListOther = [re.sub(r'\s+', '', div.text) for div in divListOther]otherOrgAdd = ','.join(divtextListOther)# 插入数据库companyId = row['companyId']linkTel = row['linkTel']recordNo = row['recordNo']areaName = row['areaName']linkMan = row['linkMan']address = row['address']compName = row['compName']recordStatus = row['recordStatus']cancelRecordTime = row.get('cancelRecordTime', '')compLevel = divtextList[2]recordTime = divtextList[6]sql1 = "insert INTO medical_register(company_id,area_name,record_no,comp_name,address,link_man,link_tel,record_status,comp_level,record_time,cancel_record_time,other_org_add) "sql2 = f"values('{companyId}','{areaName}','{recordNo}','{compName}','{address}','{linkMan}','{linkTel}','{recordStatus}','{compLevel}','{recordTime}','{cancelRecordTime}','{otherOrgAdd}')"sql3 = sql1 + sql2# 执行sqlself.cursor.execute(sql3)# 提交self.db.commit()# 获取备案专业和主要研究者信息tbody = soup.find('tbody')trList = tbody.find_all('tr')# 对tr循环获取tdfor tr in trList:tdList = tr.find_all('td')tdTextList = [td.text for td in tdList]tdTextList.insert(0, companyId)# print(tdTextList)# 插入数据库sql4 = "insert into medical_register_sub (company_id,professional_name,principal_investigator,job_title) values(%s,%s,%s,%s)"self.cursor.execute(sql4, tdTextList)# 提交到数据库self.db.commit()

2.7 写入口函数

  • 这里pageSize直接干到最大,懂的都懂!
def run(self):try:# 拿第一页的数据data = {'pageSize': 1350, 'curPage': 1}# 每次请求都初始化一次self.countself.count = 1res = self.getBody(url=self.url, send_type='post', data=data)if res is not None:# 加载为jsonjsonRes = json.loads(res.text)# 查看响应状态码status = jsonRes['success']# 如果状态为Trueif status == True:# 获取数据dataList = jsonRes['data']# 处理数据self.parseData(dataList=dataList)else:print(f"{self.url}请求失败")except Exception as e:print('发生错误!', e)

2.8 调用

  • 调用
if __name__ == '__main__':spider = mySpider('https://www.baidu.com/CTMDS/pub/PUB010100.do?method=handle05')spider.run()

3 完整代码

  • 完整代码
import requests
from bs4 import BeautifulSoup
from fake_useragent import UserAgent
import time
import random
import json
import re
import pymysql
from tqdm import tqdmclass mySpider:# 初始化urldef __init__(self, url):self.url = url# 计数,请求一个页面的次数,初始值为1self.count = 1# 数据库连接对象self.db = pymysql.connect(host='localhost',port=3306,user='root',password='logicfeng',database='test2')# 创建游标对象self.cursor = self.db.cursor()# 结束断开数据库连接def __del__(self):self.cursor.close()self.db.close()print("关闭数据库!")# 获取一个headerdef getHeader(self):# 实例化ua对象ua = UserAgent()# 随机获取一个uaheaders = {'User-Agent': ua.random}return headers# 获取请求bodydef getBody(self, url, send_type, data):# 每次请求都随机停顿一些时间# time.sleep(random.randint(1, 2))# 在超时时间内,对于失败页面尝试请求三次if self.count <= 3:try:if send_type == 'get':res = requests.get(url=url, headers=self.getHeader(), params=data, timeout=2)elif send_type == 'post':res = requests.post(url=url, headers=self.getHeader(), data=data, timeout=2)else:print("未输入send_type,直接返回None")res = Nonereturn resexcept Exception as e:print(e)self.count += 1print(f"第{self.count}次,发起请求")# 再次调用自己,并把值返回,(注意要加return)return self.getBody(url, send_type, data)# 解析bodydef parseData(self, dataList):# 循环查看详情for row in tqdm(dataList, desc='爬取进度'):# 请求详情页urlurlDetail = f"https://www.baidu.com/CTMDS/pub/PUB010100.do?method=handle04&compId={row['companyId']}"# 发起请求# 每次请求都初始化一次self.countself.count = 1res = self.getBody(url=urlDetail, send_type='get', data={})if res is not None:# 解析htmlself.parseHtml(row=row, htmlText=res.text)else:print(f"{urlDetail}请求失败!")# 解析页面def parseHtml(self, row, htmlText):soup = BeautifulSoup(htmlText, 'html.parser')# 获取备案信息divList = soup.find_all('div', class_=['col-md-8'])divtextList = [re.sub(r'\s+', '', div.text) for div in divList]# 获取其他机构地址divListOther = soup.find_all('div', class_=['col-sm-8'])divtextListOther = [re.sub(r'\s+', '', div.text) for div in divListOther]otherOrgAdd = ','.join(divtextListOther)# 插入数据库companyId = row['companyId']linkTel = row['linkTel']recordNo = row['recordNo']areaName = row['areaName']linkMan = row['linkMan']address = row['address']compName = row['compName']recordStatus = row['recordStatus']cancelRecordTime = row.get('cancelRecordTime', '')compLevel = divtextList[2]recordTime = divtextList[6]sql1 = "insert INTO medical_register(company_id,area_name,record_no,comp_name,address,link_man,link_tel,record_status,comp_level,record_time,cancel_record_time,other_org_add) "sql2 = f"values('{companyId}','{areaName}','{recordNo}','{compName}','{address}','{linkMan}','{linkTel}','{recordStatus}','{compLevel}','{recordTime}','{cancelRecordTime}','{otherOrgAdd}')"sql3 = sql1 + sql2# 执行sqlself.cursor.execute(sql3)# 提交self.db.commit()# 获取备案专业和主要研究者信息tbody = soup.find('tbody')trList = tbody.find_all('tr')# 对tr循环获取tdfor tr in trList:tdList = tr.find_all('td')tdTextList = [td.text for td in tdList]tdTextList.insert(0, companyId)# print(tdTextList)# 插入数据库sql4 = "insert into medical_register_sub (company_id,professional_name,principal_investigator,job_title) values(%s,%s,%s,%s)"self.cursor.execute(sql4, tdTextList)# 提交到数据库self.db.commit()# 入口函数def run(self):try:# 拿第一页的数据data = {'pageSize': 1350, 'curPage': 1}# 每次请求都初始化一次self.countself.count = 1res = self.getBody(url=self.url, send_type='post', data=data)if res is not None:# 加载为jsonjsonRes = json.loads(res.text)# 查看响应状态码status = jsonRes['success']# 如果状态为Trueif status == True:# 获取数据dataList = jsonRes['data']# 处理数据self.parseData(dataList=dataList)else:print(f"{self.url}请求失败")except Exception as e:print('发生错误!', e)if __name__ == '__main__':spider = mySpider('https://www.百度.com/CTMDS/pub/PUB010100.do?method=handle05')spider.run()
http://www.zhongyajixie.com/news/27066.html

相关文章:

  • vs做网站教程湖南长沙疫情最新消息
  • 口碑最好的购物网站平台网站搭建服务
  • 商务网站建设与管理怎么查询搜索关键词
  • 宝安网站公司小红书关键词热度查询
  • 健康管理公司网站建设app推广引流
  • 学校网站的建设费用吗seo综合查询是啥意思
  • 优化网站教程网络营销ppt讲解
  • 多语言外贸网站建设seo推广顾问
  • 创新的商城网站建网站设计与制作
  • 卖域名的网站要怎么做营销推广软文案例
  • 动效网站建设正规的培训机构有哪些
  • 工程建设项目招标范围和规模标准规定网站百度关键词seo排名优化
  • 创业做网站 优帮云企业官网seo
  • 做网站怎么自定义背景图片长尾关键词挖掘爱站网
  • 文艺小清新ppt模板赣州seo外包
  • jsp做网站遇到的问题三只松鼠搜索引擎营销案例
  • 色系网站新闻稿撰写
  • 专门做同人h的网站全媒体广告代理加盟
  • 深圳宝安住房和建设局网站广告图片
  • 教你用模板做网站百度大数据搜索引擎
  • 想找人做网站和app郑州网络推广公司排名
  • 最美logo图案大全长沙百度快速优化排名
  • 免费开店铺搜索引擎优化seo培训
  • 杭州哪家公司网站做的好交换友情链接平台
  • 莞城做网站公司百度宣传推广
  • 苏小小移动网站腾讯nba新闻
  • 个人做搜索网站违法吗seo免费培训教程
  • 郑州网站建设三牛外贸网站设计
  • 西安网站建设公销售营销方案100例
  • 学生做网站的软件郑州网络公司