1 爬虫案例-单线程(spa1关卡)

1.1 前言:ssr关卡都太简单了,没有写详细的过程,以我初学者的水平来看,ssr关卡随便使用个xpath、re,保存cvs或者数据库内都可以(之前的代码未保存。。。。),不在详细赘述。
1.2 奉上链接:Python爬虫案例 | Scrape Center
2 开始步入正题,先说下我的思路:第一关可以说是新手村,比ssr来讲,就是从静态数据换到了ajax接口,变成了动态数据,只需要简单的F12抓xhr包就可以轻松看到json数据,因为我也是新手,所以教程能细则细,也为了后边忘记代码怎么写能回来有个看的案例,因为是json数据,没办法使用xpath,正则也不需要,杀鸡用牛刀了纯纯是,直接看代码!
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
dimport requests
import pymongo
import csv
from datetime import datetime

class spa1:
# 初始化url和headers参数
def __init__(self):
self.url = 'https://spa1.scrape.center/api/movie/?limit=10&offset={}'
self.headers = {
'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0'
}
self.all_movies = []
self.filename = 'moves.csv'
self.client = pymongo.MongoClient(host='localhost', port=27017)
self.db = self.client['spa1']['movies']
# self.headers = ['ID','NAME','ALIAS','COVER','CATEGORIES','PUBLISHED_AT','MINUTE','SCORE','REGIONS']


# 定义翻页函数url_update
def url_update(self):
for page in range(0,100):
url = self.url.format(page)
print(f"现在正在获取此:{url}")
try:
json_info = requests.get(url, headers=self.headers, timeout=5).json()
# print(json_info)
# 判断页面是否为空
if not json_info['results']:
print(f"第{page}页已经没有数据,停止爬取")
break
self.moves_data(json_info['results'])
except Exception as e:
print(f"请求失败:{url}, 错误信息:{e}")
rum = 3
while rum > 0:
try:
json_info = requests.get(url, headers=self.headers, timeout=5).json()
self.moves_data(json_info['results'])
break
except:
rum -= 1
continue


# 定义对获取的数据进行处理函数moves_data
def moves_data(self, json_info):
if not isinstance(json_info, list):
print(f"获取的数据不是列表类型,请检查数据类型")
return

for move in json_info:
move_data = {
'ID': move.get('id'),
'NAME': move.get('name'),
'ALIAS': move.get('alias'),
'COVER': move.get('cover'),
'CATEGORIES': move.get('categories'),
'PUBLISHED_AT': move.get('published_at'),
'MINUTE': move.get('minute'),
'SCORE': move.get('score'),
'REGIONS': move.get('regions')
}
self.all_movies.append(move_data)
print(f"已收集电影数据:{len(self.all_movies)}")


# 定义保存函数save_info_cvs
def save_info_cvs(self):
if not self.all_movies:
print("没有电影数据")
return
else:
print('列表中存在数据,可以开始保存了!!!')

# 获取当前时间
times = datetime.now().strftime('%Y%m%d_%H%M%S')
# 定义文件名
# filename = f'data_{times}.csv' # 采用时间戳命名需把次注释打开
# 采用固定文件命名,在类实例位置添加self.filename = 'movies.csv'
# 定义表头
try:
headers = ['ID','NAME','ALIAS','COVER','CATEGORIES','PUBLISHED_AT','MINUTE','SCORE','REGIONS']
# 写入csv文件
with open(self.filename,'w',encoding='utf-8',newline='') as f:
# 创建csv写入对象
writer = csv.DictWriter(f,fieldnames=headers)
# 写入表头,且只在第一次写入表头
if f.tell() == 0:
writer.writeheader()
# 写入数据
for move_data in self.all_movies:
writer.writerow(move_data)
print(f"已保存电影:{move_data['NAME']}")
except Exception as e:
print(f"保存文件时错误,错误信息:{e}")


def save_info_mongo(self):
if not self.all_movies:
print('无电影数据可以保存!!!')
return
else:
print('列表中存在数据,可以开始保存了!!!')

try:
# 批量保存数据到MongoDB
result = self.db.insert_many(self.all_movies)
print(f"已保存{len(result.inserted_ids)}条数据")

# 创建索引
self.db.create_index([('ID', pymongo.ASCENDING)])
print("创建索引成功")
except Exception as e:
print(f"保存数据到MongoDB时发生错误,错误信息:{e}")

# 注:如果采用时间戳的命名方式保存数据,则会生成N个文件,如果采用固定csv文件名,则只会生成一个文件保存.

if __name__ == "__main__":
spider=spa1()
spider.url_update()
# spider.save_info_cvs() # 将数据存入cvs表格
spider.save_info_mongo() # 将数据存入mongodb数据库

3 这个代码中并没有添加多线程模块,采用的是单线程,多线程代码我会在下一篇文章中写出,下面详细说下每个函数的作用!

image-20250119214450255

3.1 引入request、pymongo、csv、datetime库,没错这个代码我定义了俩个方法,可以分别把数据存储到数据库或者保存为csv格式,因为单线程的速度特别慢,所以我有研究学习了多线程的方法来提高请求速度~
接上文,定义了spa1类,初始化url、headers属性,all_movies是空列表,filename是定义保存csv的文件名称,client是连接mongodb数据库,db是在mongodb数据库中创建数据库和表。据了解,__init__是魔术方法,在这的作用是可以确保初始化的函数能够在程序执行前被函数调用。

未完待续。。。。