首页 > 聚焦 > >正文

Python爬虫几个步骤教你写入mysql数据库

来源:哔哩哔哩2023-02-16 23:13:24


(资料图片)

Python爬虫几个步骤教你写入mysql数据库

Python爬虫实现爬取网站中的数据并存入MySQL数据库中,在爬取的时候总要涉及到数据持久化存储,当然有很多中存储的方式,简单点的有excel、txt、json、csv等等。存入mysql我觉的有好多操作空间,如果是开发python后端也可以熟悉一下sql语句,存入数据库的方法也是试了些许网上一些方法,现在把完整功能供大家参考。

直接搜索 phpStudy安装即可,按照下图配置数据库。用户名密码自行设置,然后返回首页启动即可。

pip install pymysql

打开刚安装的phpstudy安装一个mysql客户端连接,数据库是本地的host可以填 127.0.0.1 或 localhost用户名密码是上面设置的

MySQL创建对应的表

CREATE TABLE `text_archives`  (  `id` int(10) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT 'ID',  `url` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '' COMMENT '链接',  `title` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '' COMMENT '标题',  `image` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '' COMMENT '图片',  `keywords` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '关键描述',  `description` varchar(600) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '' COMMENT '内容描述',  `content` text CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL COMMENT '内容',  `weigh` int(10) NOT NULL DEFAULT 0 COMMENT '权重',  `createtime` bigint(16) NOT NULL DEFAULT 0 COMMENT '创建时间',  `updatetime` bigint(16) NOT NULL DEFAULT 0 COMMENT '更新时间',  `deletetime` bigint(16) NULL DEFAULT NULL COMMENT '删除时间',  PRIMARY KEY (`id`) USING BTREE) ENGINE = InnoDB AUTO_INCREMENT = 2692 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_unicode_ci COMMENT = '内容表' ROW_FORMAT = Dynamic;SET FOREIGN_KEY_CHECKS = 1;

构造 SQL 语句的字符串 sql ,然后通过 cursor.excute(sql) 执行,下面简单的封装,直接复制即可用。

import pymysqlclass Mysql(object):    def __init__(self):        self._connect = pymysql.connect(            host='127.0.0.1',            user='test',            password='######',            database='test',            charset='utf8mb4',            cursorclass=pymysql.cursors.DictCursor        )        self._cursor = self._connect.cursor()    def inset_db(self, table_name, insert_data):        try:            data = self.get_mysql_data(data=insert_data)            fields = data[0]            values = data[1]            sql = "INSERT INTO {table_name}({fields}) values ({values})".format(table_name=table_name, fields=fields,                                                                                values=values)            self._cursor.execute(sql)            self._connect.commit()        except Exception as e:            self._connect.rollback()  # 如果这里是执行的执行存储过程的sql命令,那么可能会存在rollback的情况,所以这里应该考虑到            print("数据插入失败,失败原因:", e)            print(insert_data)        else:            # self.db_close()            return self._cursor.lastrowid    def update_db(self, table_name, update_data, wheres=None):        try:            if wheres is not None:                sql = "UPDATE {table_name} SET {update_data} WHERE {wheres}".format(                    table_name=table_name,                    update_data=update_data,                    wheres=wheres                )            else:                sql = "UPDATE {table_name} SET {update_data}".format(                    table_name=table_name,                    update_data=update_data)            self._cursor.execute(sql)            self._connect.commit()        except Exception as e:            print("更新失败:", e)            return False        else:            # self.db_close()            return True    def delete_db(self, table_name, wheres):        try:            # 构建sql语句            sql = "DELETE FROM {table_name} WHERE {wheres}".format(table_name=table_name, wheres=wheres)            self._cursor.execute(sql)            self._connect.commit()        except Exception as e:            print('删除失败:', e)            return False        else:            # self.db_close()            return True    def select_db(self, table_name, fields, wheres=None, get_one=False):        try:            if wheres is not None:                sql = "SELECT {fields} FROM {table_name} WHERE {wheres}".format(                    fields=fields,                    table_name=table_name,                    wheres=wheres                )            else:                sql = "SELECT {fields} FROM {table_name}".format(fields=fields, table_name=table_name)            self._cursor.execute(sql)            self._connect.commit()            if get_one:                result = self._cursor.fetchone()            else:                result = self._cursor.fetchall()        except Exception as e:            print("查询失败", e)            return None        else:            # self.db_close()            return result    def get_mysql_data(self, data):        fields = ""        insert_data = ""        for k, v in data.items():            fields = fields + k + ','            insert_data = insert_data + "'" + str(v) + "'" + ','        fields = fields.strip(',')        insert_data = insert_data.strip(',')        return [fields, insert_data]    def db_close(self):        self._cursor.close()        self._connect.close()

这次简单点咱们用xpath就行,有一个小技巧咱们在爬取的网页打开开发都模式F12.如下图红框复制第一个或都第二个就行。

下面代码是实现爬取数据然后存入数据库类,大家可参考

from model.nav import Navimport requestsfrom urllib import parsefrom lxml import etreefrom fake_useragent import UserAgentfrom lib.reptile import Reptileimport jsonclass Common(object):    def __init__(self, params):        self.url = params['url']        self.params = params        self.blog = 1      def get_header(self):        ua = UserAgent()        headers = {            'User-Agent': ua.random        }        return headers    def get_html(self, url):        # 在超时间内,对于失败页面尝试请求三次        if self.blog <= 3:            try:                res = requests.get(url=url, headers=self.get_header(), timeout=3)                res.encoding = res.apparent_encoding                html = res.text                return html            except Exception as e:                print(e)                self.blog += 1                self.get_html(url)    def json_insert_data(self, params):      category_id = self.insert_category(cname=params['category_name'], pid=params['pid'], icon='')      print("分类插入成功:{}".format(params['category_name']))      if category_id:          url = params['url']          title = params['title']          image = params['image']          description = params['description']          keywords = params['keywords']          content = params['content']          self.insert_archives(category_id, url, title, image, description, keywords, content)          print("内容插入成功:{}".format(title))          print("------------------------------------------------------------")    def get_item(self, xpath_html):        item_list = xpath_html.xpath(self.params['item_xpath'])        print(item_list)        for row in item_list:            url_list = row.xpath(self.params['url_xpath'])            if len(url_list) > 0:                self.get_content(url_list[0])    def get_content(self, url):        print("正在抓取链接:{}".format(url))        domain = parse.urlparse(url).netloc        d_domain = parse.urlparse(self.url).netloc        if domain == d_domain:            html = self.get_html(url)            self.reptile.blog = 1            if html:                p = etree.HTML(html)                title = self.get_conmon_content(p, self.params['title_xpath'])                print("标题为:{}".format(title))                category_name = self.get_conmon_content(p, self.params['category_xpath'])                print("分类为:{}".format(category_name))                image = self.get_conmon_content(p, self.params['image_xpath'])                print("图片为:{}".format(image))                link = self.get_conmon_content(p, self.params['link_xpaht'])                print("链接为:{}".format(link))                description = self.get_conmon_content(p, self.params['description_xpath'])                print("描述为:{}".format(description))                keywords = self.get_conmon_content(p, self.params['keywords_xpath'])                print("关键描述:{}".format(keywords))                content = self.get_conmon_content(p, self.params['content_xpath'])                print("内容为:{}".format(content))                params = {                    "pid": 158,                    "title": title,                    "category_name": category_name,                    "image": image,                    'url': link,                    'description': description,                    'keywords': keywords,                    'content': content,                }                if title and category_name and link:                    self.json_insert_data(params)#存入数据库    def get_conmon_content(self, xpath_html, xpath):        content_list = xpath_html.xpath(xpath)        content = ''        if len(content_list) > 0:            content = content_list[0].strip()        return content    def run(self):        print("url:{}".format(self.url))        html = self.get_html(self.url)        if html:            p = etree.HTML(html)            self.get_item(p)#爬取的xpathparams = {    "url": "https://www.widiz.com/", #爬取url    "url_xpath": './/a[1]/@href',    "title_xpath": '/html/body/div[1]/div[2]/div[3]/div/div[3]/div/h1/text()',    "category_xpath": '/html/body/div[1]/div[2]/div[3]/div/div[3]/div/a[1]/text()',    "image_xpath": '/html/body/div[1]/div[2]/div[3]/div/div[2]/div/img/@src',    "link_xpaht": '/html/body/div[1]/div[2]/div[3]/div/div[3]/div/div/div[1]/span/a/@href',    "description_xpath": '/html/head/meta[10]/@content',    "keywords_xpath": '/html/head/meta[5]/@content',    "content_xpath": '/html/body/div[1]/div[2]/div[3]/main/div[1]/div/div[1]/div/div[2]/text()'}Common(params).run()

最终效果:

标签: 用户名密码 存储过程 创建时间

下一篇: 最后一页
上一篇: 刚刚,广元发布最新职务任免通知|焦点