使用 Python 进行多线程数据爬取和存储到 MongoDB 的实战案例

一、前言

这一篇同样是spa爬虫练习,关卡(一),使用多线程技术提高数据获取的效率,最后将数据存储到 MongoDB 数据库中。无论你是 Python 初学者还是已经有一定经验的开发者,都可以通过这个案例来学习如何使用 requests 库进行网络请求、使用 concurrent.futures 库进行多线程操作,以及使用 pymongo 库操作 MongoDB 数据库。

二、导入必要的库

1
2
3
4
5
6
import pymongo.errors  # 导入 pymongo 的异常类,用于处理 MongoDB 操作时可能出现的异常
import requests # 导入 requests 库,用于发送 HTTP 请求
import time # 导入 time 库,用于计算程序运行时间
import pymongo # 导入 pymongo 库,用于操作 MongoDB 数据库
from concurrent.futures import ThreadPoolExecutor, as_completed # 导入 ThreadPoolExecutor 和 as_completed,用于创建线程池和管理线程任务的完成情况
from threading import Lock # 导入 Lock 类,用于实现线程同步
  • pymongo.errors:当我们使用 pymongo 库操作 MongoDB 时,可能会出现各种错误,导入这个模块可以帮助我们处理这些异常。
  • requests:这个库是 Python 中非常流行的用于发送 HTTP 请求的工具,我们可以使用它来获取网页或 API 的内容。
  • time:用来计算程序运行的时间,方便我们评估程序的性能。
  • pymongo:提供了操作 MongoDB 数据库的接口,使我们可以存储和检索数据。
  • ThreadPoolExecutor 和 as_completed:这两个来自 concurrent.futures 库,它们是 Python 中进行多线程编程的强大工具。ThreadPoolExecutor 可以创建一个线程池,方便我们管理多个线程;as_completed 可以让我们等待线程任务完成,并获取它们的结果。
  • Lock:来自 threading 库,在多线程编程中,为了避免多个线程同时访问和修改同一资源导致的数据竞争,我们使用 Lock 来确保线程安全。

三、定义 spa1_thread

收起

1
2
3
4
5
6
7
8
9
10
11
class spa1_thread:  # 定义一个名为 spa1_thread 的类
def __init__(self): # 类的构造函数,用于初始化对象的属性
self.url = 'https://spa1.scrape.center/api/movie/?limit=10&offset={}' # 存储请求的基础 URL,{} 用于后续格式化插入 offset 值
self.headers = { # 存储请求头,模拟浏览器请求,用于反爬虫等
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36',
'Referer': 'https://spa1.scrape.center/'
}
self.all_moves = [] # 存储从 API 获取的电影信息的列表
self.client = pymongo.MongoClient(host='localhost', port=27017) # 创建 MongoDB 客户端,连接到本地 MongoDB 服务器
self.db = self.client['spa1']['movies_thread'] # 获取名为 'spa1' 的数据库和名为 'movies_thread' 的集合
self.lock = Lock() # 创建一个锁对象,用于线程同步,避免多线程并发操作时的数据竞争
  • __init__ 方法:这是类的构造函数,当我们创建

    1
    spa1_thread

    类的实例时会自动调用这个方法。

    • self.url:存储了一个基础的 URL 模板,其中 {} 是一个占位符,我们可以使用 format 方法将 offset 值插入到这个位置,实现翻页功能。
    • self.headers:存储请求头信息,这是为了模拟浏览器请求,很多网站会检查请求头来防止爬虫,我们设置 User-AgentReferer 来模拟一个正常的浏览器访问。
    • self.all_moves:一个空列表,用于存储从 API 获取的电影信息。
    • self.client:使用 pymongo.MongoClient 连接到本地的 MongoDB 服务器,host 是服务器地址,port 是端口号,这里是本地的默认地址和端口(localhost:27017)。
    • self.db:选择 spa1 数据库和 movies_thread 集合,后续的数据将存储在这里。
    • self.lock:创建一个锁对象,确保多线程环境下的数据操作安全。

四、获取数据的方法 get_data_info

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 翻页获取请求并实现格式化数据
def get_data_info(self, page): # 定义一个方法,用于根据页码获取数据
try: # 尝试执行以下代码块,如果出现异常则捕获并处理
print(f'现在正在获取第{page}页!') # 打印当前正在获取的页码
url = self.url.format(page) # 格式化 URL,将页码插入到 URL 的 {} 中
response = requests.get(url, headers=self.headers) # 发送 GET 请求,获取响应
if response.status_code == 200: # 检查响应的状态码是否为 200,表示请求成功
json_info = response.json() # 将响应内容解析为 JSON 格式
for move_data in json_info['results']: # 遍历 JSON 数据中的 results 列表
move = { # 创建一个字典存储电影信息
'id': move_data.get('id'), # 获取电影的 id
"name": move_data.get('name'), # 获取电影的名称
"alias": move_data.get('alias'), # 获取电影的别名
"cover": move_data.get('cover'), # 获取电影的封面信息
"categories": move_data.get('categories'), # 获取电影的分类
"published_at": move_data.get('published_at'), # 获取电影的发布时间
"minute": move_data.get('minute'), # 获取电影的时长
"score": move_data.get('score'), # 获取电影的评分
"regions": move_data.get('regions') # 获取电影的地区
}
# 使用锁来保证数据添加的原子性
with self.lock: # 加锁,确保同一时间只有一个线程可以添加数据
self.all_moves.append(move) # 将电影信息添加到 all_moves 列表中
else: # 如果状态码不为 200
print(f"请求{page}页时,状态码异常:{response.status_code}") # 打印异常状态码
except Exception as e: # 捕获异常
print(f"请求第{page}页是发生异常!异常为:{e}!") # 打印异常信息
  • get_data_info 方法:接收一个

    1
    page

    参数,根据页码获取电影信息。

    • print(f'现在正在获取第{page}页!'):在控制台打印正在获取的页码,方便我们跟踪程序的进度。
    • url = self.url.format(page):使用 format 方法将 page 插入到 self.url 中,形成完整的请求 URL。
    • response = requests.get(url, headers=self.headers):使用 requests.get 发送 GET 请求,同时带上请求头,这样服务器会认为我们是一个正常的浏览器请求。
    • if response.status_code == 200:检查响应的状态码是否为 200,这表示请求成功。
    • json_info = response.json():将响应的内容解析为 JSON 格式,方便我们提取数据。
    • for move_data in json_info['results']:遍历 results 列表,这个列表包含了电影的信息。
    • 创建一个 move 字典存储电影信息,使用 get 方法从 move_data 中提取每个字段的值。
    • with self.lock:使用 Lock 确保在多线程环境下,将 move 字典添加到 self.all_moves 列表的操作是原子性的,避免多个线程同时添加数据导致混乱。
    • 如果状态码不为 200,打印状态码异常信息;如果请求过程中发生异常,也会打印异常信息。

五、主函数 main

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 开启多线程主函数
def main(self): # 主函数,用于协调多线程操作和数据存储
start_time = time.time() # 记录开始时间
try: # 尝试执行以下代码块,如果出现异常则捕获并处理
with ThreadPoolExecutor(max_workers=10) as t: # 创建一个最大线程数为 10 的线程池
t1 = [t.submit(self.get_data_info, page) for page in range(0, 100)] # 提交 100 个任务到线程池,每个任务调用 get_data_info 方法并传递不同的页码
for future in as_completed(t1): # 遍历已完成的任务
future.result() # 获取任务结果,如果任务发生异常会在这里抛出
except Exception as e: # 捕获异常
print(f"线程执行时发送异常:{e}") # 打印异常信息
# 添加保存函数
self.save_info_mongo() # 调用 save_info_mongo 方法将数据保存到 MongoDB
end_time = time.time() # 记录结束时间
print(f"数据爬取过程总时间:{end_time - start_time}秒") # 计算并打印数据爬取的总时间
  • main 方法:
    • start_time = time.time():使用 time.time() 记录程序开始的时间。
    • with ThreadPoolExecutor(max_workers=10) as t:创建一个最大线程数为 10 的线程池,这样可以同时运行多个任务,提高程序效率。
    • t1 = [t.submit(self.get_data_info, page) for page in range(0, 100)]:使用列表推导式和 submit 方法将 get_data_info 方法提交到线程池,同时将 0 到 99 的页码作为参数传递给 get_data_info 方法。
    • for future in as_completed(t1):使用 as_completed 迭代器,等待线程任务完成,future.result() 可以获取任务的结果,如果任务发生异常会在此处抛出。
    • self.save_info_mongo():调用 save_info_mongo 方法将数据存储到 MongoDB。
    • end_time = time.time():记录程序结束的时间。
    • print(f"数据爬取过程总时间:{end_time - start_time}秒"):计算并打印程序运行的总时间,这样可以评估程序性能。

六、保存数据到 MongoDB 的方法 save_info_mongo

1
2
3
4
5
6
7
8
9
10
11
12
# 将数据保存进 MongoDB 数据库
def save_info_mongo(self): # 定义一个方法,用于将数据保存到 MongoDB 数据库
try: # 尝试执行以下代码块,如果出现异常则捕获并处理
if self.all_moves: # 检查 all_moves 列表是否有数据
result = self.db.insert_many(self.all_moves) # 将 all_moves 列表中的数据插入到 MongoDB 中
print(f'已保存:{len(result.inserted_ids)}条数据') # 打印插入的数据数量
self.db.create_index([('id', pymongo.ASCENDING)]) # 为 'id' 字段创建一个升序索引
print("索引创建成功") # 打印索引创建成功的信息
else: # 如果 all_moves 列表为空
print(f"列表为空,无数据可保存!") # 打印无数据可保存的信息
except pymongo.errors.PyMongoError as e: # 捕获 MongoDB 操作异常
print(f"数据保存到 MongoDB 时发生错误:{e}!") # 打印异常信息
  • save_info_mongo 方法:
    • if self.all_moves:检查 self.all_moves 列表是否有数据,如果有数据才进行存储操作。
    • result = self.db.insert_many(self.all_moves):使用 insert_many 方法将 self.all_moves 中的所有数据一次性插入到 MongoDB 中,这样可以提高插入效率。
    • print(f'已保存:{len(result.inserted_ids)}条数据'):打印插入的数据数量。
    • self.db.create_index([('id', pymongo.ASCENDING)]):为 id 字段创建一个升序索引,方便后续查询和排序。
    • 如果 self.all_moves 列表为空,打印相应信息;如果存储过程中发生 pymongo 异常,打印异常信息。

七、程序入口

1
2
3
if __name__ == '__main__':  # 程序入口
spa = spa1_thread() # 创建 spa1_thread 类的实例
spa.main() # 调用 main 方法开始执行程序
  • 这部分代码确保程序只有在作为脚本直接运行时才会执行,而不是作为模块被导入时执行。首先创建 spa1_thread 类的实例,然后调用 main 方法开始整个程序的执行。

八、总结

通过这个例子,我们学习了如何使用 Python 实现一个简单的数据爬取和存储程序。使用 requests 库可以方便地发送网络请求,使用 ThreadPoolExecutor 可以让我们的程序利用多线程并发执行,提高效率。同时,使用 pymongo 库将数据存储到 MongoDB 中,还使用了 Lock 确保多线程操作的数据安全。当然,在实际应用中,你可以根据自己的需求修改 headers 来适应不同的网站,调整线程数量,添加更多的异常处理,以及根据数据量调整存储策略等。希望这个案例可以帮助你更好地理解 Python 多线程编程和数据库操作,让你在数据爬取和存储的道路上迈出重要的一步。

九、注意事项

  • 请确保 MongoDB 服务正在运行,并且 MongoDB 服务的地址和端口正确,否则会导致连接失败。
  • 多线程操作时,虽然使用了 Lock 确保数据添加的原子性,但也会带来性能开销,根据实际情况调整线程池大小和锁的使用。
  • 对于 insert_many 操作,如果数据量很大,可以考虑分批插入,以避免性能问题。
  • 可以根据服务器性能和网络状况调整 max_workers 的数量,以避免给服务器带来过大压力。
  • 对于网络请求,可以添加更多的异常处理,例如设置 requests.get 的超时时间,如 requests.get(url, headers=self.headers, timeout=10)
  • 对于 MongoDB 的操作,根据实际情况可以添加更多的异常处理,例如处理插入重复数据的异常等。

十、完整代码附上

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import pymongo.errors
import requests
import time
import pymongo
from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import Lock


class spa1_thread:
def __init__(self):
self.url = 'https://spa1.scrape.center/api/movie/?limit=10&offset={}'
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36',
'Referer': 'https://spa1.scrape.center/'
}
self.all_moves = []
self.client = pymongo.MongoClient(host='localhost', port=27017)
self.db = self.client['spa1']['movies_thread']
self.lock = Lock()

# 翻页获取请求并实现格式化数据
def get_data_info(self, page):
try:
print(f'现在正在获取第{page}页!')
url = self.url.format(page)
response = requests.get(url, headers=self.headers)
if response.status_code == 200:
json_info = response.json()
for move_data in json_info['results']:
move = {
'id': move_data.get('id'),
"name": move_data.get('name'),
"alias": move_data.get('alias'),
"cover": move_data.get('cover'),
"categories": move_data.get('categories'),
"published_at": move_data.get('published_at'),
"minute": move_data.get('minute'),
"score": move_data.get('score'),
"regions": move_data.get('regions')
}
# 使用锁来保证数据添加的原子性
with self.lock:
self.all_moves.append(move)
else:
print(f"请求{page}页时,状态码异常:{response.status_code}")
except Exception as e:
print(f"请求第{page}页是发生异常!异常为:{e}!")

# 开启多线程主函数
def main(self):
start_time = time.time()
try:
with ThreadPoolExecutor(max_workers=10) as t:
t1 = [t.submit(self.get_data_info, page) for page in range(0, 100)]
for future in as_completed(t1):
future.result()
except Exception as e:
print(f"线程执行时发送异常:{e}")

# 添加保存函数
self.save_info_mongo()
end_time = time.time()
print(f"数据爬取过程总时间:{end_time - start_time}秒")

# 将数据保存进MongoDB数据库
def save_info_mongo(self):
try:
if self.all_moves:
result = self.db.insert_many(self.all_moves)
print(f'已保存:{len(result.inserted_ids)}条数据')

self.db.create_index([('id', pymongo.ASCENDING)])
print("索引创建成功")
else:
print(f"列表为空,无数据可保存!")
except pymongo.errors.PyMongoError as e:
print(f"数据保存到MongoDB时发生错误:{e}!")


if __name__ == '__main__':
spa = spa1_thread()
spa.main()

—碎碎念:

准备打造一个python圈子,交流,学习,兼职搞钱!!!想进的可留言,可联系我邮箱看到会回复!

希望这篇博客对你学习 Python 多线程和数据存储有所帮助,如果你有任何问题或建议,欢迎在评论区留言哦 让我们一起学习,共同进步!