Python爬虫+Go WebServer+Flutter App(Python篇)

Python爬虫+Go WebServer+Flutter App(Python篇),第1张

概述1.前言对于一个应用来说,需要获取内容、服务端提供内容、客户端展示内容,这个三部分可以通过python,go,flutter结合从而实现一个应用。2.Python爬虫获取内容通过selenium调用浏览器内核,获取对应网页内容,并解析需要的内容,最后通过MySQL保存到数据库。2.1安装python前往官网下载安装包,我

1.前言

对于一个应用来说,需要获取内容、服务端提供内容、客户端展示内容,这个三部分可以通过python,go,Flutter结合从而实现一个应用。

2.Python爬虫获取内容

通过selenium调用浏览器内核,获取对应网页内容,并解析需要的内容,最后通过MysqL保存到数据库。

2.1安装python

前往官网下载安装包,我选择的python2
选择对应系统环境安装包,下载安装完成,设置环境变量
然后在终端输入"python --version",如果显示python版本则安装完成

zxl@zxl-7060:~$ python --versionPython 2.7.12

2.2安装pip

pip 是 Python 包管理工具,该工具提供了对Python 包的查找、下载、安装、卸载的功能。
如果未安装,则通过命令curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py下载安装脚本
再通过命令sudo python get-pip.py安装脚本
通过命令pip --version来判断是否安装成功
出现如下类似信息,则安装成功

zxl@zxl-7060:~$  pip --versionpip 19.1.1 from /home/mi/.local/lib/python2.7/site-packages/pip (python 2.7)

2.3安装selenium

Selenium 是一个用于Web应用程序测试的工具
通过python调用Selenium就像真正的用户在 *** 作浏览器一样,可以很好的解决网页Js加载等问题

pip install selenium

2.4选择浏览器驱动

以Chrome为例,下载Chrome driver,选择电脑中Chrome浏览器对应版本的driver

2.4安装开发工具PyCharm

前往官网下载安装包,选择PyCharm下载
安装好开发工具,如下图所示创建好工程,准备开发

2.5安装MysqL

前往官网下载MysqL,并进行安装

2.6安装mysql-connector

输入命令pip install mysql-connector
这样就可以 *** 作MysqL数据库了

2.7网页请求

1.设置浏览器驱动位置
2.设置不打开浏览器进行网页请求
3.网页请求

#!/usr/bin/python# Coding=utf-8import platformfrom selenium import webdriverclass BaseRequest:    def get_web_content(self, url):        #Chromedriver = "C:\Program files (x86)\Google\Chrome\Application\chromedriver.exe"        Chromedriver = "/Users/zxl/Downloads/Chromedriver"        sysstr = platform.system()        if sysstr == 'Darwin':            Chromedriver = "/Users/zxl/Downloads/Chromedriver"        elif sysstr == 'windows':            Chromedriver = "D:\my_github_workspace\chromedriver.exe"        elif sysstr == 'linux':            Chromedriver = "/Users/zxl/Downloads/Chromedriver"        # 创建Chrome参数对象        opt = webdriver.ChromeOptions()        # 把Chrome设置成***面模式,不论windows还是linux都可以,自动适配对应参数        opt.set_headless()        prefs = {"profile.managed_default_content_settings.images": 2}        opt.add_experimental_option("prefs", prefs)        # 创建Chrome***面对象        driver = webdriver.Chrome(executable_path=Chromedriver, options=opt)        driver.get(url)        return driver

2.8网页内容解析

1.网页请求成功后,获取到该网页对象driver
2.通过xpath进行页面标签解析
3.解析完成关闭浏览器driver

#!/usr/bin/python# Coding=utf-8import datetimeimport hashlibimport refrom selenium.common.exceptions import NoSuchElementExceptionfrom selenium.webdriver.common.by import Byfrom selenium.webdriver.support.wait import webdriverwaitfrom selenium.webdriver.support import expected_conditionsfrom com_zxl_spIDer_db.JokeDB import JokeDBfrom com_zxl_spIDer_request.BaseRequest import *from com_zxl_spIDer_data.JokeBean import *class RequestQsbkTxt(BaseRequest):    def __init__(self):        global jokeDB        jokeDB = JokeDB()    def parse(self, end_url, index):        print "parse::end_url = ", end_url, "::index = ", index        driver = self.get_web_content("https://www.qiushibaike.com/" + end_url + str(index))        elem1 = webdriverwait(driver, 10).until(            expected_conditions.presence_of_element_located((By.XPATH, '//ul[@]')))        print "elem1 = ", elem1        elem2 = webdriverwait(driver, 10).until(            expected_conditions.presence_of_element_located((By.XPATH, '//div[@]')))        print "elem2 = ", elem2        # page_source = driver.page_source        isFindNextPage = False        paginationObject = driver.find_element_by_xpath('//ul[@]')        PagelistObject = paginationObject.find_elements_by_xpath('.//li')        for pageItemObject in PagelistObject:            page_index_txt = pageItemObject.text            print "pageItemObject::page_index_txt = ", page_index_txt            itemFindResult = re.findall(".*?(\d+).*?", page_index_txt)            print "pageItemObject::itemFindResult = ", itemFindResult            if len(itemFindResult) > 0:                if int(itemFindResult[0]) > index:                    index = int(itemFindResult[0])                    isFindNextPage = True                    break                # if index - int(itemFindResult[0]) == 1:                #     index = int(itemFindResult[0])                #     isFindNextPage = True                #     break        print "parse::isFindNextPage = ", isFindNextPage, "::index = ", index, "::end_url = ",        hotPicJokeItemPath = '//div[@]'        hotPicJokeItems = driver.find_elements_by_xpath(hotPicJokeItemPath)        print 'hotPicJokeItems length = ', len(hotPicJokeItems)        for hotPicJokeItem in hotPicJokeItems:            jokeID = hotPicJokeItem.get_attribute('ID')            md5Object = hashlib.md5()            md5Object.update(jokeID.encode('utf-8'))            jokeMd5Value = md5Object.hexdigest()            authorObject = hotPicJokeItem.find_element_by_xpath('.//div[@]')            authorNickObject = authorObject.find_element_by_xpath('.//h2')            authorNickname = authorNickObject.text            authorimgObject = authorObject.find_element_by_xpath('.//img')            authorimgurl = authorimgObject.get_attribute('src')            authorGender = ''            authorAge = -1            try:                authorGenderObject = authorObject.find_element_by_xpath(".//div[starts-with(@class,'articleGender')]")                authorGender = authorGenderObject.get_attribute('class')                authorAge = authorGenderObject.text            except NoSuchElementException as e:                print e            contentObject = hotPicJokeItem.find_element_by_xpath('.//div[@]')            content = contentObject.text            thumbimgurl = ''            try:                thumbObject = hotPicJokeItem.find_element_by_xpath('.//div[@]')                thumbimgObject = thumbObject.find_element_by_xpath('.//img')                thumbimgurl = thumbimgObject.get_attribute('src')            except NoSuchElementException as e:                print e            statsVoteContent = ''            statsCommentContent = ''            statsCommentDetailUrl = ''            try:                statsObject = hotPicJokeItem.find_element_by_xpath('.//div[@]')                try:                    statsVoteObject = statsObject.find_element_by_xpath('.//span[@]')                    statsVoteContent = statsVoteObject.text                except NoSuchElementException as e:                    print e                try:                    statsCommentObject = statsObject.find_element_by_xpath('.//span[@]')                    statsCommentContent = statsCommentObject.find_element_by_xpath(                        './/a[@]').text                    statsCommentDetailUrl = statsCommentObject.find_element_by_xpath(                        './/a[@]').get_attribute('href')                except NoSuchElementException as e:                    print e            except NoSuchElementException as e:                print e            # print authorNickname            # print authorGender            # print authorAge            # print authorimgurl            # print content            # print thumbimgurl            # print statsVoteContent            # print statsCommentContent            # print statsCommentDetailUrl            # print jokeID            # print jokeMd5Value            # print '\n'            # print '======================================end=========================================='            # print '\n'            joke_bean = JokeBean()            joke_bean = joke_bean.create_joke_bean(                authorNickname.encode('utf-8'),                authorGender,                authorAge,                authorimgurl,                content.encode('utf-8'),                thumbimgurl,                statsVoteContent,                statsCommentContent,                statsCommentDetailUrl,                jokeMd5Value)            isExistJokeItem = jokeDB.query_by_md5(jokeMd5Value)            print isExistJokeItem            if isExistJokeItem is None:                print "not ExistJokeItem"                jokeDB.insert_joke(joke_bean)            else:                print "ExistJokeItem"                driver.close()                return        print "==============end================="        print "\n"        driver.close()        if not isFindNextPage:            return        else:            self.parse(end_url, index)    def clas_db(self):        if jokeDB is not None:            jokeDB.close_db()    def start_task(self):        print "start_task::", 'Now Time::', datetime.datetime.Now().strftime('%Y-%m-%d %H:%M:%s')        self.parse("pic/page/", 1)        self.clas_db()if __name__ == "__main__":    request = RequestQsbkTxt()    # request.parse("pic/page/", 1)    request.parse("pic/page/", 1)    request.clas_db()

2.9数据集合

1.根据解析内容构造数据集合

#!/usr/bin/python# Coding=utf-8class JokeBean:    def create_joke_bean(self,                         author_nick_name,                         author_gender,                         author_age,                         author_img_url,                         content,                         thumb_img_url,                         stats_Vote_content,                         stats_comment_content,                         stats_comment_detail_url,                         md5):        bean = {'author_nick_name': author_nick_name,                'author_gender': author_gender,                'author_age': author_age,                'author_img_url': author_img_url,                'content': content,                'thumb_img_url': thumb_img_url,                'stats_Vote_content': stats_Vote_content,                'stats_comment_content': stats_comment_content,                'stats_comment_detail_url': stats_comment_detail_url,                'md5': md5}        return bean

2.10保存数据库

1.设置数据库ip地址,端口号
2.设置连接数据用户名,密码
3.设置连接的数据库名称
4.如果数据库不存在,则创建数据库,并创建相应的表
5.设置增删改查 *** 作

#!/usr/bin/python# Coding=utf-8import MysqL.connectorfrom MysqL.connector import errorcodeclass BaseDB:    host = 'zxltest.zicp.vip'    port = '42278'    urser_name = "***"    pass_word = "***"    db_name = 'joke'    CREATE_table_sql = ("")    def __init__(self):        global cnx        global cursor        try:            cnx = MysqL.connector.connect(user=self.urser_name, password=self.pass_word, host=self.host, port=self.port, database=self.db_name)            cursor = cnx.cursor()        except MysqL.connector.Error as err:            if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:                print("Something is wrong with your user name or password")                exit(1)            elif err.errno == errorcode.ER_BAD_DB_ERROR:                print("Database does not exist")                cnx = MysqL.connector.connect(user=self.urser_name, password=self.pass_word, host=self.host, port=self.port)                cursor = cnx.cursor()                self.__create_database()                self.__create_table()            else:                print(err)                exit(1)        else:            self.__create_table()            print("DBUtil init finish")    def __create_database(self):        try:            cursor.execute("CREATE DATABASE {} DEFAulT CHaraCTER SET 'utf8'".format(self.db_name))            cnx.database = self.db_name            print("Create database finish")        except MysqL.connector.Error as err:            print("Failed creating database: {}".format(err))            exit(1)    def __create_table(self):        # for name, ddl in CityDB.tableS.iteritems():        print "create table::", self.CREATE_table_sql        try:            print("Creating table {}: ".format(self.CREATE_table_sql),)            cursor.execute(self.CREATE_table_sql)        except MysqL.connector.Error as err:            if err.errno == errorcode.ER_table_EXISTS_ERROR:                print("already exists.")            else:                print(err.msg)                exit(1)        else:            print("OK")    def query(self, sql_str):        print "query::", sql_str        cursor.execute(sql_str)        return cursor    def insert(self, sql_str, param):        cursor.execute(sql_str, param)        cnx.commit()    def update(self, sql_str):        cursor.execute(sql_str)        cnx.commit    def delete(self, sql_str):        cursor.execute(sql_str)        cnx.commit()    def close_db(self):        cursor.close()        cnx.close()

#!/usr/bin/python# Coding=utf-8import MysqLfrom MysqL.connector import errorcodefrom com_zxl_spIDer_data.JokeBean import JokeBeanfrom com_zxl_spIDer_db.BaseDB import BaseDBclass JokeDB(BaseDB):    table_name = 'joke'    ColUME_ID = 'ID'    ColUME_AUTHOR_NICK_name = 'author_nick_name'    ColUME_AUTHOR_GENDER = 'author_gender'    ColUME_AUTHOR_AGE = 'author_age'    ColUME_AUTHOR_img_URL = 'author_img_url'    ColUME_CONTENT = 'content'    ColUME_THUMB_img_URL = 'thumb_img_url'    ColUME__STATS_Vote_CONTENT = 'stats_Vote_content'    ColUME_STATS_COMMENT_CONTENT = 'stats_comment_content'    ColUME_STATS_COMMENT_DETAIL_URL = 'stats_comment_detail_url'    ColUME_MD5 = 'md5'    CREATE_table_sql = (        "CREATE table IF NOT EXISTS " + table_name + " ("        "  " + ColUME_ID + " bigint(20) NOT NulL auto_INCREMENT,"        "  " + ColUME_AUTHOR_NICK_name + "  varchar(16),"        "  " + ColUME_AUTHOR_GENDER + " text,"        "  " + ColUME_AUTHOR_AGE + " text,"        "  " + ColUME_AUTHOR_img_URL + " text,"        "  " + ColUME_CONTENT + " text,"        "  " + ColUME_THUMB_img_URL + " text,"        "  " + ColUME__STATS_Vote_CONTENT + " text,"        "  " + ColUME_STATS_COMMENT_CONTENT + " text,"        "  " + ColUME_STATS_COMMENT_DETAIL_URL + " text,"        "  " + ColUME_MD5 + " text,"        "  PRIMARY KEY (" + ColUME_ID + ")"        ") ENGINE=InnoDB")    INSERT_JOKE_sql = ("INSERT INTO " + table_name + " ("                                                     + ColUME_AUTHOR_NICK_name + ","                                                     + ColUME_AUTHOR_GENDER + ","                                                     + ColUME_AUTHOR_AGE + ","                                                     + ColUME_AUTHOR_img_URL + ","                                                     + ColUME_CONTENT + ","                                                     + ColUME_THUMB_img_URL + ","                                                     + ColUME__STATS_Vote_CONTENT + ","                                                     + ColUME_STATS_COMMENT_CONTENT + ","                                                     + ColUME_STATS_COMMENT_DETAIL_URL + ","                                                     + ColUME_MD5                                                     + ") "                                                     + "VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)")    query_JOKE_BY_MD5 = ("SELECT "                         + ColUME_AUTHOR_NICK_name + ","                         + ColUME_AUTHOR_GENDER + ","                         + ColUME_AUTHOR_AGE + ","                         + ColUME_AUTHOR_img_URL + ","                         + ColUME_CONTENT + ","                         + ColUME_THUMB_img_URL + ","                         + ColUME__STATS_Vote_CONTENT + ","                         + ColUME_STATS_COMMENT_CONTENT + ","                         + ColUME_STATS_COMMENT_DETAIL_URL + ","                         + ColUME_MD5                         + " FROM " + table_name                         + " WHERE " + ColUME_MD5 + " = '%s'")    def create_insert_data(self, joke_bean):        return (            joke_bean['author_nick_name'],            joke_bean['author_gender'],            joke_bean['author_age'],            joke_bean['author_img_url'],            joke_bean['content'],            joke_bean['thumb_img_url'],            joke_bean['stats_Vote_content'],            joke_bean['stats_comment_content'],            joke_bean['stats_comment_detail_url'],            joke_bean['md5']        )    def insert_joke(self, joke_bean):        self.insert(self.INSERT_JOKE_sql, self.create_insert_data(joke_bean))    def query_by_md5(self, md5):        cursor = self.query(self.query_JOKE_BY_MD5 % (md5,))        for (ColUME_AUTHOR_NICK_name,             ColUME_AUTHOR_GENDER,             ColUME_AUTHOR_AGE,             ColUME_AUTHOR_img_URL,             ColUME_CONTENT,             ColUME_THUMB_img_URL,             ColUME__STATS_Vote_CONTENT,             ColUME_STATS_COMMENT_CONTENT,             ColUME_STATS_COMMENT_DETAIL_URL,             ColUME_MD5) in cursor:            jokeBean = JokeBean()            return jokeBean.create_joke_bean(ColUME_AUTHOR_NICK_name,                                             ColUME_AUTHOR_GENDER,                                             ColUME_AUTHOR_AGE,                                             ColUME_AUTHOR_img_URL,                                             ColUME_CONTENT,                                             ColUME_THUMB_img_URL,                                             ColUME__STATS_Vote_CONTENT,                                             ColUME_STATS_COMMENT_CONTENT,                                             ColUME_STATS_COMMENT_DETAIL_URL,                                             ColUME_MD5)        return None

               

总结

以上是内存溢出为你收集整理的Python爬虫+Go WebServer+Flutter App(Python篇)全部内容,希望文章能够帮你解决Python爬虫+Go WebServer+Flutter App(Python篇)所遇到的程序开发问题。

如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/langs/1186214.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-06-03
下一篇 2022-06-03

发表评论

登录后才能评论

评论列表(0条)

保存