Как Python скачивает файлы: библиотеки и техники для разработчиков
Для кого эта статья:
- Python-разработчики, ищущие способы загрузки файлов из интернета
- Студенты и начинающие программисты, интересующиеся практическими аспектами работы с файлами и сетью
Профессионалы в области анализа данных и машинного обучения, которые работают с большими объемами данных и автоматизацией процессов
Скачивание файлов из интернета — одна из самых распространенных задач, с которой сталкивается практически каждый Python-разработчик. Будь то загрузка данных для анализа, обновление программного обеспечения или автоматизация рутинных задач — умение организовать эффективную загрузку файлов критически важно. Правильно написанный скрипт может превратить часы ручной работы в секунды автоматизированного процесса. В этом руководстве я покажу проверенные подходы к загрузке файлов с использованием Python — от базовых решений до продвинутых техник, которые применяют профессионалы. 📂 🚀
Хотите не просто читать о загрузке файлов, а научиться создавать полноценные веб-приложения на Python? На курсе Обучение Python-разработке от Skypro вы получите практические навыки работы с сетевыми протоколами, API и файловыми операциями под руководством опытных разработчиков. Реальные проекты с первых недель обучения помогут вам быстро перейти от теории к практике и стать востребованным Python-специалистом.
Python для загрузки файлов: основные библиотеки и подходы
В экосистеме Python существует несколько мощных библиотек для работы с сетевыми ресурсами и загрузки файлов. Каждая из них имеет свои преимущества и лучше подходит для определенных сценариев использования. Рассмотрим ключевые инструменты, на которые стоит обратить внимание.
В арсенале Python-разработчика для загрузки файлов обычно находятся следующие библиотеки:
- requests — наиболее удобная и интуитивная библиотека для HTTP-запросов
- urllib — встроенная библиотека Python для работы с URL
- wget — обертка над утилитой wget, предлагающая расширенные возможности загрузки
- aiohttp — асинхронная библиотека для HTTP-запросов
- httpx — современная альтернатива requests с поддержкой асинхронности
| Библиотека | Преимущества | Недостатки | Идеальный сценарий использования |
|---|---|---|---|
| requests | Простой и понятный API, отличная документация, широкая поддержка | Не поддерживает асинхронность нативно | Простые скрипты, прототипирование, небольшие проекты |
| urllib | Входит в стандартную библиотеку, не требует установки | Более многословный синтаксис | Когда требуется минимум зависимостей |
| wget | Поддержка рекурсивной загрузки, продолжения скачивания | Требует установки утилиты wget в системе | Загрузка больших файлов, сайтов целиком |
| aiohttp | Асинхронные запросы, высокая производительность | Сложнее в освоении, требует знания asyncio | Высоконагруженные приложения, массовая загрузка файлов |
| httpx | Синхронный и асинхронный API в одной библиотеке | Менее зрелый проект, чем requests | Современные проекты с возможным переходом на асинхронность |
Выбор библиотеки зависит от конкретных требований проекта: если вам нужен простой и быстрый способ скачать файл — requests идеален. Для встраивания в существующее приложение без дополнительных зависимостей подойдет urllib. Если же вы планируете массовую загрузку с многими параллельными соединениями — обратите внимание на асинхронные решения.
Алексей Иванов, технический директор
Несколько лет назад наша команда столкнулась с задачей загрузки тысяч изображений для обучения нейросети. Первоначально использовали простые скрипты на requests, но они работали слишком медленно — загрузка полного набора данных заняла бы недели. Мы перешли на асинхронный подход с aiohttp, и время загрузки сократилось в 12 раз! Однако это потребовало существенного переписывания кода. Если бы мы изначально выбрали правильный инструмент, это сэкономило бы нам недели разработки. Мой совет: сразу оценивайте масштаб задачи и выбирайте инструмент с запасом производительности.

Скачивание файлов с помощью requests: простые решения
Библиотека requests — первый выбор для многих разработчиков при работе с HTTP-запросами благодаря своему интуитивному интерфейсу. Давайте рассмотрим несколько примеров скачивания файлов с её помощью.
Прежде чем начать, убедитесь, что библиотека установлена в вашем окружении:
pip install requests
Простейший пример скачивания файла выглядит следующим образом:
import requests
url = "https://example.com/files/document.pdf"
response = requests.get(url)
with open("document.pdf", "wb") as file:
file.write(response.content)
Этот код работает, но имеет несколько недостатков: он загружает весь файл в память перед записью на диск, что может быть проблематично для больших файлов, и не показывает прогресс загрузки. Улучшенная версия с потоковой загрузкой и отображением прогресса:
import requests
import os
url = "https://example.com/files/large_file.zip"
local_filename = os.path.basename(url)
# Потоковая загрузка
response = requests.get(url, stream=True)
total_size = int(response.headers.get('content-length', 0))
block_size = 1024 # 1 Кб
downloaded = 0
with open(local_filename, 'wb') as file:
for data in response.iter_content(block_size):
file.write(data)
downloaded += len(data)
# Отображаем прогресс загрузки
progress = downloaded / total_size * 100
print(f"\rЗагружено: {downloaded} из {total_size} байт ({progress:.2f}%)", end="")
Для удобства можно создать универсальную функцию скачивания, которую можно использовать в разных проектах:
def download_file(url, save_path=None):
"""
Скачивает файл по URL с отображением прогресса.
Args:
url (str): URL файла для скачивания
save_path (str, optional): Путь для сохранения. Если не указан,
используется имя файла из URL.
Returns:
str: Путь к сохраненному файлу
"""
if save_path is None:
save_path = os.path.basename(url)
response = requests.get(url, stream=True)
response.raise_for_status() # Вызовет исключение при ошибке HTTP
total_size = int(response.headers.get('content-length', 0))
block_size = 1024
downloaded = 0
print(f"Начинаю загрузку файла: {save_path}")
with open(save_path, 'wb') as file:
for data in response.iter_content(block_size):
file.write(data)
downloaded += len(data)
if total_size > 0: # Проверяем, известен ли размер файла
progress = downloaded / total_size * 100
print(f"\rПрогресс: [{downloaded}/{total_size} байт] {progress:.1f}%", end="")
print("\nЗагрузка завершена!")
return save_path
# Пример использования:
download_file("https://example.com/files/data.csv", "data/latest_data.csv")
Библиотека requests также позволяет работать с аутентификацией, что необходимо для скачивания файлов с защищенных ресурсов:
# Базовая аутентификация
response = requests.get(
"https://api.example.com/protected/file.pdf",
auth=("username", "password")
)
# Аутентификация с использованием токена
headers = {"Authorization": "Bearer YOUR_ACCESS_TOKEN"}
response = requests.get(
"https://api.example.com/files/document.docx",
headers=headers
)
Работа с прокси также доступна, что может быть полезно при массовой загрузке файлов для обхода ограничений:
proxies = {
'http': 'http://10.10.10.10:8000',
'https': 'http://10.10.10.10:8000'
}
response = requests.get(url, proxies=proxies)
Библиотека requests предлагает удобный инструментарий для большинства типичных задач по загрузке файлов. Она особенно хороша для быстрого прототипирования и относительно небольших задач. 🚀
Работа с urllib для организации загрузки крупных файлов
Несмотря на популярность requests, стандартная библиотека urllib остаётся мощным инструментом для загрузки файлов, особенно когда речь идёт о крупных данных. Её главное преимущество — она входит в стандартную библиотеку Python, поэтому не требует установки дополнительных пакетов.
Михаил Сорокин, архитектор систем машинного обучения
Мне пришлось координировать процесс загрузки набора данных размером 500 ГБ для обучения наших моделей. Когда мы использовали requests, столкнулись с неожиданными проблемами памяти на нашем сервере. После анализа выяснилось, что при работе с крупными файлами requests загружал в память слишком большие куски данных. Переход на urllib с правильно настроенными буферами решил проблему, и загрузка стала более стабильной. Самое главное — мы смогли настроить автоматическое восстановление соединения при прерываниях, что особенно ценно при долгих загрузках по нестабильным каналам связи.
Базовый пример использования urllib для загрузки файла выглядит следующим образом:
from urllib.request import urlopen
url = "https://example.com/files/large_dataset.zip"
with urlopen(url) as response, open("large_dataset.zip", "wb") as out_file:
data = response.read()
out_file.write(data)
Однако, как и в случае с простым примером requests, здесь есть проблема — весь файл загружается в память. Для больших файлов лучше использовать потоковый подход:
from urllib.request import urlopen
url = "https://example.com/files/large_dataset.zip"
block_size = 1024 * 8 # 8 KB
with urlopen(url) as response, open("large_dataset.zip", "wb") as out_file:
while True:
buffer = response.read(block_size)
if not buffer:
break
out_file.write(buffer)
Для работы с аутентификацией и заголовками в urllib используется немного более сложный код, чем в requests:
from urllib.request import Request, urlopen
import base64
url = "https://example.com/protected/large_file.zip"
username = "user"
password = "password"
# Создание базовой авторизации
credentials = f"{username}:{password}"
encoded_credentials = base64.b64encode(credentials.encode('ascii')).decode('ascii')
headers = {
'Authorization': f'Basic {encoded_credentials}'
}
# Создание запроса
req = Request(url, headers=headers)
# Загрузка с использованием потокового подхода
with urlopen(req) as response, open("large_file.zip", "wb") as out_file:
while True:
buffer = response.read(8192)
if not buffer:
break
out_file.write(buffer)
Одно из преимуществ urllib — возможность точного контроля над процессом загрузки. Например, вы можете отслеживать скорость загрузки и показывать более детальную информацию:
import time
import sys
from urllib.request import urlopen
def download_with_progress(url, filename):
"""
Скачивает файл с отображением прогресса и скорости загрузки.
Args:
url (str): URL файла
filename (str): Имя для сохраняемого файла
"""
start_time = time.time()
downloaded = 0
block_size = 1024 * 8
with urlopen(url) as response, open(filename, 'wb') as out_file:
total_size = int(response.info().get('Content-Length', -1))
if total_size < 0:
print("Неизвестный размер файла")
while True:
before_read = time.time()
buffer = response.read(block_size)
if not buffer:
break
out_file.write(buffer)
downloaded += len(buffer)
# Расчет скорости загрузки
elapsed = time.time() – start_time
if elapsed > 0:
speed = downloaded / elapsed / 1024 # KB/s
if total_size > 0:
percent = downloaded * 100 / total_size
estimated = (total_size – downloaded) / (downloaded / elapsed) if downloaded > 0 else 0
sys.stdout.write(f"\r{percent:.1f}% | {downloaded/1024/1024:.1f} МБ из {total_size/1024/1024:.1f} МБ | {speed:.1f} КБ/с | Осталось: {estimated/60:.1f} мин")
else:
sys.stdout.write(f"\r{downloaded/1024/1024:.1f} МБ | {speed:.1f} КБ/с")
sys.stdout.flush()
print("\nЗагрузка завершена!")
total_time = time.time() – start_time
print(f"Скачано {downloaded/1024/1024:.1f} МБ за {total_time:.1f} сек. Средняя скорость: {downloaded/total_time/1024:.1f} КБ/с")
# Пример использования
download_with_progress("https://example.com/large-dataset.zip", "dataset.zip")
Для сложных сценариев urllib может быть расширен до работы с перенаправлениями, прокси и обработкой ошибок:
| Функциональность | urllib | requests |
|---|---|---|
| Встроена в стандартную библиотеку | Да ✓ | Нет ✗ |
| Управление размером буфера | Точный контроль ✓ | Ограниченный контроль ⚠️ |
| Поддержка прокси | Требует настройки ⚠️ | Простая настройка ✓ |
| Поддержка cookie | Требует CookieJar ⚠️ | Встроенная поддержка ✓ |
| Обработка перенаправлений | Требует дополнительной обработки ⚠️ | Автоматическая ✓ |
| Потребление памяти при загрузке больших файлов | Низкое (контролируемое размером буфера) ✓ | Выше (зависит от chunk_size) ⚠️ |
Для обеспечения надежности при загрузке больших файлов, рекомендуется реализовать возможность продолжения загрузки с определенной позиции в случае прерывания:
import os
from urllib.request import Request, urlopen
from urllib.error import URLError, HTTPError
def resume_download(url, filename):
"""
Скачивает файл с возможностью продолжения загрузки после прерывания.
Args:
url (str): URL файла
filename (str): Имя для сохраняемого файла
"""
block_size = 1024 * 8
downloaded = 0
# Проверяем, существует ли файл и какого он размера
file_size = 0
if os.path.exists(filename):
file_size = os.path.getsize(filename)
# Создаем заголовок для продолжения загрузки
headers = {}
if file_size > 0:
headers['Range'] = f'bytes={file_size}-'
print(f"Возобновление загрузки с {file_size} байт")
try:
req = Request(url, headers=headers)
with urlopen(req) as response:
# Проверяем поддержку продолжения загрузки
if file_size > 0 and response.getcode() != 206:
print("Сервер не поддерживает продолжение загрузки. Начинаем сначала.")
file_size = 0
total_size = int(response.info().get('Content-Length', -1)) + file_size
# Открываем файл для дозаписи или создания
mode = 'ab' if file_size > 0 else 'wb'
with open(filename, mode) as out_file:
while True:
buffer = response.read(block_size)
if not buffer:
break
out_file.write(buffer)
downloaded += len(buffer)
# Отображаем прогресс
current_size = file_size + downloaded
if total_size > 0:
percent = current_size * 100 / total_size
print(f"\r{percent:.1f}% | {current_size/1024/1024:.1f} МБ из {total_size/1024/1024:.1f} МБ", end="")
else:
print(f"\r{current_size/1024/1024:.1f} МБ скачано", end="")
print("\nЗагрузка завершена!")
except HTTPError as e:
print(f"HTTP Error: {e.code} – {url}")
except URLError as e:
print(f"URL Error: {e.reason} – {url}")
# Пример использования
resume_download("https://example.com/very-large-dataset.zip", "dataset.zip")
Библиотека urllib особенно полезна в проектах, где требуется минимум внешних зависимостей и точный контроль над процессом загрузки крупных файлов. Хотя ее API менее интуитивен по сравнению с requests, она обеспечивает более низкоуровневый доступ к сетевым операциям, что может быть критично в определенных сценариях использования. 💾
Автоматизация загрузки Python: обработка ошибок и таймауты
В реальных условиях загрузка файлов часто сталкивается с различными трудностями: нестабильное соединение, таймауты сервера, ограничения на количество запросов и другие проблемы. Грамотная обработка ошибок и настройка таймаутов — ключевые элементы надежного скрипта загрузки.
Рассмотрим основные типы ошибок при загрузке файлов и способы их обработки:
- Ошибки соединения — возникают при невозможности установить соединение с сервером
- Таймауты — происходят, когда сервер не отвечает в течение определенного времени
- HTTP-ошибки — коды состояния HTTP, указывающие на проблемы (404, 403, 500 и т.д.)
- Ошибки валидации SSL — связаны с проблемами сертификатов
- Ошибки прав доступа — возникают при попытке записать файл в защищенное место
Вот пример надежного скрипта загрузки с использованием requests, включающий обработку всех основных типов ошибок:
import requests
import time
import logging
import os
from requests.exceptions import RequestException, Timeout, TooManyRedirects, ConnectionError, HTTPError
# Настройка логирования
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s – %(name)s – %(levelname)s – %(message)s',
handlers=[
logging.FileHandler("download.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger("download_manager")
def download_with_retry(url, filename, max_retries=5, timeout=(30, 300),
backoff_factor=0.5, verify_ssl=True, allow_redirects=True):
"""
Загружает файл с автоматическим повторением попыток при ошибках.
Args:
url (str): URL файла
filename (str): Путь для сохранения файла
max_retries (int): Максимальное число повторных попыток
timeout (tuple): Таймауты для соединения и чтения (в секундах)
backoff_factor (float): Множитель для экспоненциальной задержки между попытками
verify_ssl (bool): Проверять ли SSL сертификаты
allow_redirects (bool): Разрешать ли HTTP-перенаправления
Returns:
bool: True если загрузка успешна, False в случае ошибки
"""
logger.info(f"Начинаю загрузку {url} -> {filename}")
# Создаем директорию для файла если её нет
os.makedirs(os.path.dirname(os.path.abspath(filename)), exist_ok=True)
# Проверяем, существует ли файл и какого он размера
file_size = 0
if os.path.exists(filename):
file_size = os.path.getsize(filename)
logger.info(f"Файл уже существует, размер: {file_size} байт")
retry_count = 0
while retry_count <= max_retries:
try:
headers = {}
if file_size > 0:
headers['Range'] = f'bytes={file_size}-'
logger.info(f"Продолжаем загрузку с позиции {file_size}")
response = requests.get(
url,
stream=True,
timeout=timeout,
headers=headers,
verify=verify_ssl,
allow_redirects=allow_redirects
)
# Проверка HTTP-статуса
response.raise_for_status()
# Проверка поддержки продолжения загрузки
if file_size > 0 and response.status_code != 206:
logger.warning("Сервер не поддерживает продолжение загрузки. Начинаем сначала.")
file_size = 0
# Получаем общий размер файла
total_size = int(response.headers.get('content-length', 0))
if 'content-range' in response.headers:
content_range = response.headers['content-range']
total_size = int(content_range.split('/')[1])
else:
total_size += file_size
logger.info(f"Общий размер файла: {total_size} байт")
# Открываем файл для записи
mode = 'ab' if file_size > 0 and response.status_code == 206 else 'wb'
downloaded = 0
with open(filename, mode) as f:
start_time = time.time()
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
downloaded += len(chunk)
# Логируем прогресс каждые 5 МБ
if downloaded % (5 * 1024 * 1024) < 8192:
elapsed = time.time() – start_time
speed = downloaded / elapsed / 1024 if elapsed > 0 else 0
logger.info(f"Загружено: {downloaded + file_size} из {total_size} байт ({speed:.1f} КБ/с)")
logger.info(f"Загрузка завершена успешно: {url}")
return True
except Timeout as e:
retry_count += 1
wait_time = backoff_factor * (2 ** (retry_count – 1))
logger.warning(f"Таймаут при загрузке {url}. Повтор через {wait_time:.1f} секунд ({retry_count}/{max_retries})")
if retry_count <= max_retries:
time.sleep(wait_time)
except ConnectionError as e:
retry_count += 1
wait_time = backoff_factor * (2 ** (retry_count – 1))
logger.warning(f"Ошибка соединения при загрузке {url}: {str(e)}. Повтор через {wait_time:.1f} секунд ({retry_count}/{max_retries})")
if retry_count <= max_retries:
time.sleep(wait_time)
except HTTPError as e:
# Для некоторых HTTP-ошибок повтор не имеет смысла
if e.response.status_code in (404, 403, 401):
logger.error(f"HTTP-ошибка при загрузке {url}: {e.response.status_code} {e.response.reason}. Повтор невозможен.")
return False
retry_count += 1
wait_time = backoff_factor * (2 ** (retry_count – 1))
logger.warning(f"HTTP-ошибка при загрузке {url}: {e.response.status_code}. Повтор через {wait_time:.1f} секунд ({retry_count}/{max_retries})")
if retry_count <= max_retries:
time.sleep(wait_time)
except RequestException as e:
retry_count += 1
wait_time = backoff_factor * (2 ** (retry_count – 1))
logger.warning(f"Ошибка запроса при загрузке {url}: {str(e)}. Повтор через {wait_time:.1f} секунд ({retry_count}/{max_retries})")
if retry_count <= max_retries:
time.sleep(wait_time)
except Exception as e:
logger.error(f"Неожиданная ошибка при загрузке {url}: {str(e)}")
return False
logger.error(f"Не удалось загрузить {url} после {max_retries} попыток")
return False
# Пример использования
success = download_with_retry(
"https://example.com/large-file.zip",
"downloads/large-file.zip",
max_retries=3,
timeout=(30, 600), # 30 сек на соединение, 10 минут на загрузку
verify_ssl=True
)
if success:
logger.info("Файл успешно загружен")
else:
logger.error("Не удалось загрузить файл")
Для автоматизированных систем, которые должны загружать файлы по расписанию, важно настроить правильное поведение в случае ошибок. Вот несколько ключевых практик:
- Экспоненциальная задержка — увеличивайте время ожидания между попытками загрузки при повторяющихся ошибках
- Дифференцированная обработка ошибок — разные типы ошибок требуют разных стратегий повторения попыток
- Детальное логирование — записывайте все ошибки и предупреждения для последующего анализа
- Уведомления о критических ошибках — настройте отправку уведомлений при критических сбоях
- Проверка целостности скачанных файлов — использование контрольных сумм для валидации загрузки
Пример проверки целостности загруженного файла с использованием хеша MD5:
import hashlib
def verify_download(filename, expected_md5):
"""
Проверяет целостность загруженного файла по MD5-хешу.
Args:
filename (str): Путь к файлу
expected_md5 (str): Ожидаемый MD5-хеш
Returns:
bool: True если хеш совпадает, False в противном случае
"""
md5_hash = hashlib.md5()
with open(filename, "rb") as f:
# Читаем файл по частям чтобы не загружать его в память целиком
for chunk in iter(lambda: f.read(4096), b""):
md5_hash.update(chunk)
computed_hash = md5_hash.hexdigest()
if computed_hash != expected_md5:
logging.error(f"Ошибка проверки целостности для {filename}")
logging.error(f"Ожидаемый MD5: {expected_md5}")
logging.error(f"Полученный MD5: {computed_hash}")
return False
logging.info(f"Проверка целостности успешна: {filename}")
return True
При работе с загрузкой файлов также важно учитывать ограничения серверов. Многие серверы используют механизмы защиты от слишком частых запросов (rate limiting). Для обхода таких ограничений можно использовать задержки между запросами:
def download_multiple_files(url_list, save_dir, delay=1.0):
"""
Загружает несколько файлов с задержкой между запросами.
Args:
url_list (list): Список URL для загрузки
save_dir (str): Директория для сохранения файлов
delay (float): Задержка между запросами в секундах
Returns:
list: Список успешно загруженных файлов
"""
os.makedirs(save_dir, exist_ok=True)
successful_downloads = []
for i, url in enumerate(url_list):
filename = os.path.basename(url)
save_path = os.path.join(save_dir, filename)
logger.info(f"Загрузка файла {i+1}/{len(url_list)}: {url}")
success = download_with_retry(url, save_path)
if success:
successful_downloads.append(save_path)
logger.info(f"Успешно загружен: {save_path}")
else:
logger.error(f"Не удалось загрузить: {url}")
# Задержка перед следующим запросом
if i < len(url_list) – 1:
logger.info(f"Ждем {delay} секунд перед следующей загрузкой...")
time.sleep(delay)
return successful_downloads
Автоматизация загрузки файлов в Python требует тщательной обработки ошибок, настройки таймаутов и механизмов повторных попыток. Правильно реализованная система может работать надежно даже в условиях нестабильного соединения и других ограничений. 🔄
Продвинутые техники для массового скачивания файлов
Когда требуется загрузить сотни или тысячи файлов в кратчайшие сроки, стандартные подходы могут оказаться неэффективными. В таких случаях на помощь приходят продвинутые техники массового скачивания, включая асинхронные и многопоточные решения.
Рассмотрим несколько подходов к организации массовой загрузки файлов:
- Многопоточность (threading) — использование нескольких потоков для параллельной загрузки
- Мультипроцессинг (multiprocessing) — распределение загрузки между несколькими процессами
- Асинхронный подход (asyncio) — неблокирующие операции ввода-вывода для эффективного использования ресурсов
- Комбинированные стратегии — сочетание различных подходов для максимальной производительности
Начнем с многопоточного решения, которое хорошо работает для операций ввода-вывода, таких как загрузка файлов:
import requests
import os
import time
import logging
from concurrent.futures import ThreadPoolExecutor
from urllib.parse import urlparse
logging.basicConfig(level=logging.INFO, format='%(asctime)s – %(name)s – %(levelname)s – %(message)s')
logger = logging.getLogger("mass_downloader")
def download_file(url, save_dir):
"""
Загружает один файл.
Args:
url (str): URL файла
save_dir (str): Директория для сохранения
Returns:
tuple: (успех, путь к файлу, время загрузки)
"""
start_time = time.time()
filename = os.path.basename(urlparse(url).path)
if not filename: # Если URL не содержит имени файла
filename = f"file_{int(start_time)}.dat"
save_path = os.path.join(save_dir, filename)
try:
response = requests.get(url, stream=True, timeout=(30, 300))
response.raise_for_status()
with open(save_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
elapsed = time.time() – start_time
logger.info(f"Загружен {url} -> {save_path} за {elapsed:.2f} сек")
return True, save_path, elapsed
except Exception as e:
logger.error(f"Ошибка при загрузке {url}: {str(e)}")
return False, None, time.time() – start_time
def mass_download_threaded(url_list, save_dir, max_workers=10):
"""
Загружает множество файлов параллельно, используя пул потоков.
Args:
url_list (list): Список URL для загрузки
save_dir (str): Директория для сохранения файлов
max_workers (int): Максимальное количество параллельных потоков
Returns:
tuple: (успешные загрузки, неудачные загрузки, общее время)
"""
os.makedirs(save_dir, exist_ok=True)
successful = []
failed = []
total_start_time = time.time()
logger.info(f"Начинаю загрузку {len(url_list)} файлов с использованием {max_workers} потоков")
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Отправляем все задачи на выполнение
future_to_url = {
executor.submit(download_file, url, save_dir): url
for url in url_list
}
# Обрабатываем результаты по мере их завершения
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
success, save_path, elapsed = future.result()
if success:
successful.append((url, save_path, elapsed))
else:
failed.append(url)
except Exception as e:
logger.error(f"Исключение при обработке {url}: {str(e)}")
failed.append(url)
total_elapsed = time.time() – total_start_time
# Выводим статистику
logger.info(f"Загрузка завершена за {total_elapsed:.2f} секунд")
logger.info(f"Успешно загружено: {len(successful)}/{len(url_list)} файлов")
logger.info(f"Общий размер: {sum(os.path.getsize(path) for _, path, _ in successful) / (1024*1024):.2f} МБ")
if failed:
logger.warning(f"Не удалось загрузить {len(failed)} файлов")
return successful, failed, total_elapsed
# Пример использования
urls = [
"https://example.com/file1.pdf",
"https://example.com/file2.jpg",
"https://example.com/file3.zip",
# ... добавьте больше URL
]
successful, failed, elapsed = mass_download_threaded(urls, "downloads", max_workers=5)
Для еще большей эффективности, особенно при работе с большим количеством файлов, можно использовать асинхронный подход с библиотекой aiohttp:
import aiohttp
import asyncio
import os
import time
import logging
from urllib.parse import urlparse
logging.basicConfig(level=logging.INFO, format='%(asctime)s – %(name)s – %(levelname)s – %(message)s')
logger = logging.getLogger("async_downloader")
async def fetch_file(session, url, save_dir):
"""
Асинхронно загружает файл.
Args:
session (aiohttp.ClientSession): Сессия aiohttp
url (str): URL файла
save_dir (str): Директория для сохранения
Returns:
tuple: (успех, путь к файлу или None, время загрузки)
"""
start_time = time.time()
filename = os.path.basename(urlparse(url).path)
if not filename:
filename = f"file_{int(start_time)}.dat"
save_path = os.path.join(save_dir, filename)
try:
async with session.get(url) as response:
if response.status != 200:
logger.error(f"Ошибка HTTP {response.status} для {url}")
return False, None, time.time() – start_time
with open(save_path, 'wb') as f:
while True:
chunk = await response.content.read(8192)
if not chunk:
break
f.write(chunk)
elapsed = time.time() – start_time
logger.info(f"Загружен {url} -> {save_path} за {elapsed:.2f} сек")
return True, save_path, elapsed
except Exception as e:
logger.error(f"Ошибка при загрузке {url}: {str(e)}")
return False, None, time.time() – start_time
async def mass_download_async(url_list, save_dir, max_concurrent=10):
"""
Асинхронно загружает множество файлов с ограничением параллельных запросов.
Args:
url_list (list): Список URL для загрузки
save_dir (str): Директория для сохранения файлов
max_concurrent (int): Максимальное количество параллельных запросов
Returns:
tuple: (успешные загрузки, неудачные загрузки, общее время)
"""
os.makedirs(save_dir, exist_ok=True)
successful = []
failed = []
total_start_time = time.time()
# Используем семафор для ограничения числа одновременных запросов
semaphore = asyncio.Semaphore(max_concurrent)
# Оборачиваем каждую задачу в семафор
async def fetch_with_semaphore(url):
async with semaphore:
return await fetch_file(session, url, save_dir)
logger.info(f"Начинаю асинхронную загрузку {len(url_list)} файлов с лимитом {max_concurrent}")
# Создаем клиентскую сессию
conn = aiohttp.TCPConnector(limit=None, ttl_dns_cache=300)
async with aiohttp.ClientSession(connector=conn) as session:
# Создаем задачи для всех URL
tasks = [fetch_with_semaphore(url) for url in url_list]
# Выполняем все задачи асинхронно
results = await asyncio.gather(*tasks, return_exceptions=True)
# Обрабатываем результаты
for i, result in enumerate(results):
if isinstance(result, Exception):
logger.error(f"Исключение для {url_list[i]}: {str(result)}")
failed.append(url_list[i])
else:
success, path, elapsed = result
if success:
successful.append((url_list[i], path, elapsed))
else:
failed.append(url_list[i])
total_elapsed = time.time() – total_start_time
# Выводим статистику
logger.info(f"Асинхронная загрузка завершена за {total_elapsed:.2f} секунд")
logger.info(f"Успешно загружено: {len(successful)}/{len(url_list)} файлов")
if failed:
logger.warning(f"Не удалось загрузить {len(failed)} файлов")
return successful, failed, total_elapsed
# Пример использования
async def main():
urls = [
"https://example.com/file1.pdf",
"https://example.com/file2.jpg",
"https://example.com/file3.zip",
# ... добавьте больше URL
]
successful, failed, elapsed = await mass_download_async(urls, "async_downloads", max_concurrent=10)
print(f"Загрузка завершена за {elapsed:.2f} секунд")
print(f"Успешно: {len(successful)}, неудачно: {len(failed)}")
# Запуск асинхронной программы
if __name__ == "__main__":
asyncio.run(main())
Для очень крупных проектов можно использовать гибридный подход, сочетающий асинхронность и многопроцессорность:
import aiohttp
import asyncio
import os
import time
import logging
import multiprocessing
from functools import partial
logging.basicConfig(level=logging.INFO, format='%(asctime)s – %(name)s – %(levelname)s – %(message)s')
logger = logging.getLogger("hybrid_downloader")
# Асинхронные функции для загрузки файлов (как в предыдущем примере)
# ...
def process_batch(url_batch, save_dir, max_concurrent):
"""
Функция для обработки пакета URL в отдельном процессе.
Args:
url_batch (list): Пакет URL для загрузки
save_dir (str): Директория для сохранения
max_concurrent (int): Максимальное количество параллельных запросов
Returns:
tuple: (успешные загрузки, неудачные загрузки, время)
"""
# Создаем отдельный цикл событий для этого процесса
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# Запускаем асинхронную загрузку
return loop.run_until_complete(mass_download_async(url_batch, save_dir, max_concurrent))
def hybrid_mass_download(url_list, save_dir, process_count=None, max_concurrent_per_process=10):
"""
Гибридный подход: разделяет URL между процессами, каждый из которых
выполняет асинхронную загрузку.
Args:
url_list (list): Список URL для загрузки
save_dir (str): Директория для сохранения файлов
process_count (int): Количество процессов (по умолчанию – количество ядер CPU)
max_concurrent_per_process (int): Лимит одновременных соединений на процесс
Returns:
tuple: (успешные загрузки, неудачные загрузки, общее время)
"""
if process_count is None:
process_count = multiprocessing.cpu_count()
# Ограничиваем количество процессов разумным значением
process_count = min(process_count, len(url_list), multiprocessing.cpu_count() * 2)
os.mak