Python в облачных сервисах: автоматизация AWS, GCP и Azure
Для кого эта статья:
- Разработчики и IT-специалисты, интересующиеся облачными вычислениями
- Начинающие и опытные программисты, желающие освоить Python для автоматизации облачных задач
DevOps-инженеры, разрабатывающие и управляющие облачной инфраструктурой
Python стал негласным стандартом для разработчиков, погружающихся в мир облачных вычислений. Его гибкость и мощные библиотеки позволяют превращать сложные облачные операции в элегантные скрипты из нескольких строк кода. Масштабирование инфраструктуры, которое раньше занимало часы ручной работы, теперь автоматизируется за минуты. Большинство IT-инженеров не осознают потенциал Python, используя его лишь для базовых задач, в то время как правильно написанные скрипты могут стать настоящим секретным оружием в управлении облачными ресурсами. 🚀
Хотите освоить Python для работы с облачными сервисами профессионально? Обучение Python-разработке от Skypro — это идеальный старт! На курсе вы не только изучите основы языка, но и погрузитесь в практику взаимодействия с AWS, Google Cloud и Azure. Наши студенты создают реальные проекты по автоматизации облачных задач и получают востребованные навыки, которые моментально применяют в работе.
Основы Python для работы с облачными сервисами
Успешная работа с облачными сервисами через Python начинается с понимания ключевых концепций взаимодействия. Облачные провайдеры предоставляют API, которые становятся мостом между вашим кодом и их инфраструктурой. Python превосходно подходит для этой задачи благодаря своей читаемости и богатой экосистеме библиотек.
Прежде чем погружаться в код, необходимо настроить окружение. Установите Python (рекомендуется версия 3.8 или выше) и создайте виртуальное окружение для изоляции зависимостей проекта:
python -m venv cloud-env
source cloud-env/bin/activate # для Linux/Mac
cloud-env\Scripts\activate # для Windows
Следующий шаг — установка соответствующих SDK (Software Development Kit) для выбранного облачного провайдера:
# Для AWS
pip install boto3
# Для Google Cloud
pip install google-cloud
# Для Azure
pip install azure-sdk
Аутентификация — краеугольный камень взаимодействия с облачными сервисами. Каждый провайдер имеет свои механизмы:
| Провайдер | Метод аутентификации | Файл конфигурации |
|---|---|---|
| AWS | Access Key + Secret Key | ~/.aws/credentials |
| Google Cloud | Service Account Key | JSON файл ключа |
| Azure | Service Principal | Переменные окружения |
Базовый шаблон для работы с облачными сервисами обычно включает три этапа:
- Инициализация клиента с учетными данными
- Формирование запроса к нужному сервису
- Обработка ответа
Например, для AWS это выглядит так:
import boto3
# Инициализация клиента
s3 = boto3.client('s3')
# Формирование запроса
response = s3.list_buckets()
# Обработка ответа
for bucket in response['Buckets']:
print(f"Bucket Name: {bucket['Name']}")
Для эффективной работы важно следовать нескольким принципам:
- Всегда обрабатывайте исключения при работе с облачными API
- Используйте файлы конфигурации вместо хардкода учетных данных
- Применяйте пагинацию для запросов, возвращающих большие наборы данных
- Реализуйте механизмы повторных попыток для нестабильных сетевых соединений
Александр Петров, DevOps-инженер
В прошлом году наша команда столкнулась с проблемой: сотни EC2-инстансов в AWS требовали регулярного мониторинга и обновления тегов. Ручное управление занимало около 8 часов еженедельно. Я написал Python-скрипт с использованием boto3, который автоматически проверял состояние инстансов, обновлял теги и отправлял отчеты. Интересно, что первая версия скрипта содержала фатальную ошибку — он не обрабатывал исключения при сетевых сбоях и просто "падал".
Добавив всего несколько строк с try-except и механизмом повторных попыток, мы превратили нестабильное решение в надежный инструмент. Сейчас скрипт запускается по расписанию, экономя команде более 30 часов ежемесячно. Ключевой урок: даже простая обработка исключений может радикально повысить надежность облачной автоматизации.

Библиотеки Python для AWS, Google Cloud и Azure
Каждый крупный облачный провайдер предлагает официальные Python-библиотеки, обеспечивающие программный доступ к своим сервисам. Рассмотрим особенности и возможности основных SDK. 🔍
AWS: Boto3
Boto3 — официальный SDK для AWS, предоставляющий объектно-ориентированный API и низкоуровневый клиент для сервисов Amazon. Библиотека выделяется своей зрелостью и обширной документацией.
Установка и базовая настройка:
pip install boto3
# Конфигурация через код
import boto3
session = boto3.Session(
aws_access_key_id='YOUR_ACCESS_KEY',
aws_secret_access_key='YOUR_SECRET_KEY',
region_name='us-west-2'
)
Boto3 предлагает два стиля взаимодействия с AWS:
- Клиентский интерфейс — низкоуровневый API, точно отображающий HTTP-запросы к сервисам AWS
- Ресурсный интерфейс — высокоуровневый объектно-ориентированный API, абстрагирующий детали взаимодействия
Пример использования обоих интерфейсов для работы с S3:
# Клиентский интерфейс
s3_client = session.client('s3')
response = s3_client.list_buckets()
print(response['Buckets'])
# Ресурсный интерфейс
s3_resource = session.resource('s3')
for bucket in s3_resource.buckets.all():
print(bucket.name)
Google Cloud: google-cloud-python
Google Cloud предлагает набор Python-библиотек, каждая из которых соответствует определенному сервису. Центральный пакет — google-cloud-core, но для работы с конкретными сервисами требуется установка соответствующих библиотек.
pip install google-cloud-storage # для работы с Cloud Storage
pip install google-cloud-bigquery # для работы с BigQuery
Аутентификация в Google Cloud обычно осуществляется через файл ключа сервисного аккаунта:
from google.cloud import storage
# Аутентификация через переменную окружения
# export GOOGLE_APPLICATION_CREDENTIALS="path/to/service-account-key.json"
# Или явно через код
client = storage.Client.from_service_account_json('path/to/service-account-key.json')
# Работа с бакетами
buckets = list(client.list_buckets())
for bucket in buckets:
print(bucket.name)
Azure: Azure SDK for Python
Microsoft Azure предоставляет модульный SDK для Python, состоящий из множества пакетов, соответствующих различным сервисам. Каждый пакет можно устанавливать отдельно:
pip install azure-storage-blob # для работы с Blob Storage
pip install azure-mgmt-compute # для управления виртуальными машинами
Аутентификация в Azure имеет несколько методов:
from azure.identity import DefaultAzureCredential
from azure.storage.blob import BlobServiceClient
# Использование DefaultAzureCredential (ищет креды в окружении)
credential = DefaultAzureCredential()
blob_service = BlobServiceClient(
account_url="https://mystorageaccount.blob.core.windows.net",
credential=credential
)
# Работа с контейнерами
containers = blob_service.list_containers()
for container in containers:
print(container.name)
Сравнительная таблица особенностей Python SDK для основных облачных провайдеров:
| Характеристика | AWS (Boto3) | Google Cloud | Azure |
|---|---|---|---|
| Структура SDK | Монолитная | Модульная | Модульная |
| Стиль API | Клиентский и ресурсный | Объектно-ориентированный | Объектно-ориентированный |
| Асинхронная поддержка | Ограниченная | Через отдельные библиотеки | Встроенная |
| Типизация | Динамическая | Статические подсказки типов | Полная поддержка типов |
| Документация | Обширная | Детальная | Комплексная |
Автоматизация облачных задач с помощью Python-скриптов
Автоматизация рутинных задач — одно из главных преимуществ использования Python с облачными сервисами. Грамотно написанные скрипты экономят время, снижают вероятность человеческих ошибок и обеспечивают стабильность инфраструктуры. 🤖
Рассмотрим практические примеры автоматизации типовых задач для разных облачных провайдеров.
Массовое управление EC2-инстансами в AWS
Задача: запустить или остановить группу EC2-инстансов по заданным тегам.
import boto3
import argparse
def manage_instances_by_tag(tag_key, tag_value, action):
ec2 = boto3.resource('ec2')
# Фильтруем инстансы по тегу
instances = ec2.instances.filter(
Filters=[
{'Name': f'tag:{tag_key}', 'Values': [tag_value]},
]
)
instance_ids = [instance.id for instance in instances]
if not instance_ids:
print(f"No instances found with tag {tag_key}={tag_value}")
return
# Выполняем действие
if action == 'start':
ec2.instances.filter(InstanceIds=instance_ids).start()
print(f"Started {len(instance_ids)} instances")
elif action == 'stop':
ec2.instances.filter(InstanceIds=instance_ids).stop()
print(f"Stopped {len(instance_ids)} instances")
else:
print(f"Unknown action: {action}")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Manage EC2 instances by tag')
parser.add_argument('--tag-key', required=True, help='Tag key to filter by')
parser.add_argument('--tag-value', required=True, help='Tag value to filter by')
parser.add_argument('--action', choices=['start', 'stop'], required=True, help='Action to perform')
args = parser.parse_args()
manage_instances_by_tag(args.tag_key, args.tag_value, args.action)
Этот скрипт можно запускать из командной строки:
python manage_ec2.py --tag-key Environment --tag-value Dev --action stop
Ротация бэкапов в Google Cloud Storage
Задача: автоматически удалять старые бэкапы, оставляя только N последних версий.
from google.cloud import storage
from datetime import datetime, timedelta
import os
def cleanup_old_backups(bucket_name, prefix, days_to_keep):
client = storage.Client()
bucket = client.get_bucket(bucket_name)
# Получаем текущую дату
current_date = datetime.now()
cutoff_date = current_date – timedelta(days=days_to_keep)
# Получаем список объектов с префиксом
blobs = bucket.list_blobs(prefix=prefix)
deleted_count = 0
for blob in blobs:
# Получаем дату создания объекта
creation_date = blob.time_created.replace(tzinfo=None)
if creation_date < cutoff_date:
print(f"Deleting {blob.name}, created on {creation_date}")
blob.delete()
deleted_count += 1
print(f"Cleanup completed. Deleted {deleted_count} old backups.")
if __name__ == "__main__":
# Эти значения можно передавать через аргументы командной строки или env переменные
bucket_name = os.environ.get('BACKUP_BUCKET', 'my-backup-bucket')
prefix = os.environ.get('BACKUP_PREFIX', 'database/daily/')
days_to_keep = int(os.environ.get('DAYS_TO_KEEP', '30'))
cleanup_old_backups(bucket_name, prefix, days_to_keep)
Мониторинг потребления ресурсов в Azure
Задача: собрать статистику использования виртуальных машин и отправить отчет.
from azure.identity import DefaultAzureCredential
from azure.mgmt.compute import ComputeManagementClient
from azure.mgmt.monitor import MonitorManagementClient
import datetime
import pandas as pd
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
def get_vm_usage_statistics(subscription_id, days=7):
# Аутентификация
credential = DefaultAzureCredential()
# Инициализируем клиенты
compute_client = ComputeManagementClient(credential, subscription_id)
monitor_client = MonitorManagementClient(credential, subscription_id)
# Получаем список VM
vms = compute_client.virtual_machines.list_all()
# Настройка периода для метрик
end_time = datetime.datetime.now()
start_time = end_time – datetime.timedelta(days=days)
# Собираем статистику
stats = []
for vm in vms:
resource_id = vm.id
resource_group = resource_id.split('/')[4]
vm_name = vm.name
vm_size = vm.hardware_profile.vm_size
# Получаем метрики CPU за период
cpu_metrics = monitor_client.metrics.list(
resource_id,
timespan=f"{start_time.isoformat()}/{end_time.isoformat()}",
interval='PT1H',
metricnames='Percentage CPU',
aggregation='Average'
)
# Вычисляем среднее значение CPU
cpu_values = []
for item in cpu_metrics.value[0].timeseries:
for data in item.data:
if data.average is not None:
cpu_values.append(data.average)
avg_cpu = sum(cpu_values) / len(cpu_values) if cpu_values else 0
stats.append({
'VM Name': vm_name,
'Resource Group': resource_group,
'Size': vm_size,
'Avg CPU (%)': round(avg_cpu, 2)
})
return stats
def send_email_report(stats, recipient, smtp_server, smtp_port, sender, password):
# Создаем DataFrame и HTML-таблицу
df = pd.DataFrame(stats)
df = df.sort_values('Avg CPU (%)', ascending=False)
html_table = df.to_html(index=False)
# Формируем email
msg = MIMEMultipart()
msg['Subject'] = f'Azure VM Usage Report – {datetime.datetime.now().strftime("%Y-%m-%d")}'
msg['From'] = sender
msg['To'] = recipient
body = f"""
<html>
<head>
<style>
table {{ border-collapse: collapse; width: 100%; }}
th, td {{ text-align: left; padding: 8px; border: 1px solid #ddd; }}
tr:nth-child(even) {{ background-color: #f2f2f2; }}
th {{ background-color: #4CAF50; color: white; }}
</style>
</head>
<body>
<h2>VM Usage Report</h2>
{html_table}
</body>
</html>
"""
msg.attach(MIMEText(body, 'html'))
# Отправляем email
with smtplib.SMTP(smtp_server, smtp_port) as server:
server.starttls()
server.login(sender, password)
server.send_message(msg)
print(f"Report sent to {recipient}")
if __name__ == "__main__":
subscription_id = 'your-subscription-id'
stats = get_vm_usage_statistics(subscription_id)
# Эти данные лучше хранить в защищенном виде, например, в Key Vault
send_email_report(
stats,
recipient='admin@example.com',
smtp_server='smtp.office365.com',
smtp_port=587,
sender='reports@example.com',
password='your-password'
)
Типичные сценарии автоматизации облачных задач включают:
- Ежедневное резервное копирование данных в облачное хранилище
- Масштабирование ресурсов на основе метрик нагрузки
- Проверка соответствия ресурсов политикам безопасности
- Генерация отчетов о затратах и использовании ресурсов
- Автоматизированное развертывание и обновление инфраструктуры
Ирина Соколова, Cloud Solution Architect
Однажды я консультировала стартап, который тратил почти 30% своего облачного бюджета на неиспользуемые ресурсы. Их команда разработки создавала тестовые окружения и забывала выключать их после работы. Мы написали простой скрипт на Python, который каждый вечер проверял активность инстансов по специальным тегам и автоматически выключал неиспользуемые.
Самое интересное произошло через неделю после внедрения: мы добавили в скрипт отправку ежедневного отчета, показывающего, кто и какие ресурсы оставил включенными, и сколько денег это стоило бы без автоматического выключения. Эффект оказался неожиданным — разработчики начали соревноваться, кто меньше раз попадет в "список расточителей". Через месяц экономия составила 22% от общего облачного бюджета компании, а команда приобрела полезную привычку ответственного отношения к ресурсам.
Управление ресурсами облака через Python API
Управление облачными ресурсами через API — мощный инструмент для создания, изменения и мониторинга инфраструктуры программным путем. Python предоставляет элегантные абстракции для работы с низкоуровневыми API, делая процесс интуитивно понятным даже для начинающих разработчиков. 🛠️
Рассмотрим ключевые аспекты управления ресурсами через Python API для основных облачных провайдеров.
Управление сетевой инфраструктурой в AWS
Настройка VPC, подсетей и групп безопасности — фундамент для большинства облачных развертываний:
import boto3
import uuid
def create_network_infrastructure(vpc_cidr, region):
ec2 = boto3.client('ec2', region_name=region)
# Создаем VPC
vpc_response = ec2.create_vpc(CidrBlock=vpc_cidr)
vpc_id = vpc_response['Vpc']['VpcId']
# Добавляем тег для идентификации
ec2.create_tags(
Resources=[vpc_id],
Tags=[{'Key': 'Name', 'Value': f'VPC-{str(uuid.uuid4())[:8]}'}]
)
# Создаем публичную подсеть
public_subnet_response = ec2.create_subnet(
VpcId=vpc_id,
CidrBlock=vpc_cidr.replace('0/16', '0/24'),
AvailabilityZone=f'{region}a'
)
public_subnet_id = public_subnet_response['Subnet']['SubnetId']
ec2.create_tags(
Resources=[public_subnet_id],
Tags=[{'Key': 'Name', 'Value': 'Public-Subnet'}]
)
# Создаем Internet Gateway
igw_response = ec2.create_internet_gateway()
igw_id = igw_response['InternetGateway']['InternetGatewayId']
# Присоединяем IGW к VPC
ec2.attach_internet_gateway(
InternetGatewayId=igw_id,
VpcId=vpc_id
)
# Создаем таблицу маршрутизации
route_table_response = ec2.create_route_table(VpcId=vpc_id)
route_table_id = route_table_response['RouteTable']['RouteTableId']
# Добавляем маршрут в интернет
ec2.create_route(
RouteTableId=route_table_id,
DestinationCidrBlock='0.0.0.0/0',
GatewayId=igw_id
)
# Ассоциируем таблицу маршрутизации с подсетью
ec2.associate_route_table(
RouteTableId=route_table_id,
SubnetId=public_subnet_id
)
# Создаем группу безопасности
sg_response = ec2.create_security_group(
GroupName='WebServerSG',
Description='Security group for web servers',
VpcId=vpc_id
)
sg_id = sg_response['GroupId']
# Разрешаем входящий HTTP и SSH трафик
ec2.authorize_security_group_ingress(
GroupId=sg_id,
IpPermissions=[
{
'IpProtocol': 'tcp',
'FromPort': 80,
'ToPort': 80,
'IpRanges': [{'CidrIp': '0.0.0.0/0'}]
},
{
'IpProtocol': 'tcp',
'FromPort': 22,
'ToPort': 22,
'IpRanges': [{'CidrIp': '0.0.0.0/0'}]
}
]
)
return {
'vpc_id': vpc_id,
'subnet_id': public_subnet_id,
'security_group_id': sg_id
}
# Пример использования
network = create_network_infrastructure('10.0.0.0/16', 'us-west-2')
print(f"Created VPC: {network['vpc_id']}")
print(f"Created Subnet: {network['subnet_id']}")
print(f"Created Security Group: {network['security_group_id']}")
Управление хранилищем в Google Cloud
Cloud Storage — распространенный сервис для хранения объектов. Вот как создавать бакеты и управлять файлами:
from google.cloud import storage
import os
def manage_cloud_storage(project_id, location):
# Инициализация клиента
storage_client = storage.Client(project=project_id)
# Создаем уникальное имя для бакета
bucket_name = f"{project_id}-storage-{os.urandom(4).hex()}"
# Создаем новый бакет
bucket = storage_client.create_bucket(
bucket_name,
location=location
)
print(f"Bucket {bucket_name} created in {location}")
# Загружаем файл в бакет
def upload_file(source_file_path, destination_blob_name):
blob = bucket.blob(destination_blob_name)
# Загружаем файл
blob.upload_from_filename(source_file_path)
# Делаем файл публично доступным
blob.make_public()
print(f"File {source_file_path} uploaded to {destination_blob_name}")
print(f"Public URL: {blob.public_url}")
return blob.public_url
# Пример загрузки файла
public_url = upload_file('example.txt', 'documents/example.txt')
# Получаем список всех объектов в бакете
def list_blobs():
blobs = storage_client.list_blobs(bucket_name)
for blob in blobs:
print(f"File: {blob.name}, Size: {blob.size} bytes")
list_blobs()
# Устанавливаем политику жизненного цикла
def set_lifecycle_policy():
bucket.lifecycle_rules = [
{
'action': {'type': 'Delete'},
'condition': {'age': 30} # Удаление файлов старше 30 дней
},
{
'action': {'type': 'SetStorageClass', 'storageClass': 'NEARLINE'},
'condition': {'age': 10} # Перенос в Nearline для файлов старше 10 дней
}
]
bucket.patch()
print("Lifecycle policy set")
set_lifecycle_policy()
return {
'bucket_name': bucket_name,
'public_url': public_url
}
# Пример использования
result = manage_cloud_storage('my-project-id', 'us-central1')
print(f"Storage management completed for bucket: {result['bucket_name']}")
Управление вычислительными ресурсами в Azure
Создание и настройка виртуальных машин в Azure с помощью Python:
from azure.identity import DefaultAzureCredential
from azure.mgmt.resource import ResourceManagementClient
from azure.mgmt.network import NetworkManagementClient
from azure.mgmt.compute import ComputeManagementClient
import os
def create_vm_infrastructure(subscription_id, resource_group, location, vm_name, admin_username, admin_password):
# Аутентификация
credential = DefaultAzureCredential()
# Инициализация клиентов
resource_client = ResourceManagementClient(credential, subscription_id)
network_client = NetworkManagementClient(credential, subscription_id)
compute_client = ComputeManagementClient(credential, subscription_id)
# Создаем ресурсную группу, если её нет
rg_result = resource_client.resource_groups.create_or_update(
resource_group,
{
"location": location
}
)
print(f"Resource group {rg_result.name} created or updated")
# Создаем виртуальную сеть
vnet_name = f"{vm_name}-vnet"
subnet_name = f"{vm_name}-subnet"
ip_name = f"{vm_name}-ip"
nic_name = f"{vm_name}-nic"
poller = network_client.virtual_networks.begin_create_or_update(
resource_group,
vnet_name,
{
"location": location,
"address_space": {
"address_prefixes": ["10.0.0.0/16"]
}
}
)
vnet_result = poller.result()
print(f"Virtual network {vnet_result.name} created")
# Создаем подсеть
poller = network_client.subnets.begin_create_or_update(
resource_group,
vnet_name,
subnet_name,
{
"address_prefix": "10.0.0.0/24"
}
)
subnet_result = poller.result()
print(f"Subnet {subnet_result.name} created")
# Создаем публичный IP-адрес
poller = network_client.public_ip_addresses.begin_create_or_update(
resource_group,
ip_name,
{
"location": location,
"sku": { "name": "Standard" },
"public_ip_allocation_method": "Static",
"public_ip_address_version": "IPV4"
}
)
ip_result = poller.result()
print(f"Public IP address {ip_result.name} created")
# Создаем сетевой интерфейс
poller = network_client.network_interfaces.begin_create_or_update(
resource_group,
nic_name,
{
"location": location,
"ip_configurations": [
{
"name": "ipconfig1",
"subnet": { "id": subnet_result.id },
"public_ip_address": { "id": ip_result.id }
}
]
}
)
nic_result = poller.result()
print(f"Network interface {nic_result.name} created")
# Создаем виртуальную машину
poller = compute_client.virtual_machines.begin_create_or_update(
resource_group,
vm_name,
{
"location": location,
"storage_profile": {
"image_reference": {
"publisher": "Canonical",
"offer": "UbuntuServer",
"sku": "18.04-LTS",
"version": "latest"
}
},
"hardware_profile": {
"vm_size": "Standard_DS1_v2"
},
"os_profile": {
"computer_name": vm_name,
"admin_username": admin_username,
"admin_password": admin_password
},
"network_profile": {
"network_interfaces": [
{
"id": nic_result.id
}
]
}
}
)
vm_result = poller.result()
print(f"Virtual machine {vm_result.name} created")
# Получаем данные о созданной VM
return {
"vm_name": vm_result.name,
"public_ip": ip_result.ip_address,
"resource_group": resource_group,
"location": location
}
# Пример использования
vm_info = create_vm_infrastructure(
subscription_id="your-subscription-id",
resource_group="example-resources",
location="eastus",
vm_name="example-vm",
admin_username="azureuser",
admin_password="ComplexPassword!234" # В реальных сценариях лучше использовать Key Vault
)
print(f"VM created: {vm_info['vm_name']}")
print(f"Public IP: {vm_info['public_ip']}")
Лучшие практики работы с облачными API
При работе с облачными API через Python важно следовать определенным принципам для обеспечения надежности и безопасности:
- Управление учетными данными: никогда не хардкодите ключи и секреты в коде. Используйте переменные окружения, файлы конфигурации или специализированные сервисы управления секретами.
- Обработка ошибок: всегда реализуйте корректную обработку исключений при работе с API, учитывая временные сбои сети и ограничения сервисов.
- Идемпотентность: разрабатывайте операции так, чтобы их можно было безопасно повторять без нежелательных побочных эффектов.
- Тестирование: используйте эмуляторы и mocking для тестирования кода без затрат на реальные облачные ресурсы.
- Логирование: реализуйте подробное логирование всех операций с API для упрощения отладки и аудита.
Готовые решения Python для облачных сервисов с кодом
Помимо базовых SDK, существует множество готовых Python-библиотек и фреймворков, упрощающих работу с облачными сервисами. Они обеспечивают более высокий уровень абстракции и автоматизируют распространенные сценарии. 📦
Pulumi: инфраструктура как код на Python
Pulumi — открытый инструмент, позволяющий описывать облачную инфраструктуру на Python, JavaScript, TypeScript, Go и других языках программирования.
Установка Pulumi:
pip install pulumi
pip install pulumi-aws # или pulumi-azure, pulumi-gcp
Пример создания S3 бакета с настройкой статического хостинга:
import pulumi
import pulumi_aws as aws
# Создаем S3 бакет для хостинга статического сайта
website_bucket = aws.s3.Bucket("website-bucket",
website={
"index_document": "index.html",
"error_document": "error.html",
},
acl="public-read" # Делаем бакет публично доступным
)
# Создаем политику доступа
bucket_policy = aws.s3.BucketPolicy("bucket-policy",
bucket=website_bucket.id,
policy=website_bucket.id.apply(lambda id: f'''{{
"Version": "2012-10-17",
"Statement": [
{{
"Effect": "Allow",
"Principal": "*",
"Action": "s3:GetObject",
"Resource": "arn:aws:s3:::{id}/*"
}}
]
}}''')
)
# Загружаем HTML-файл в бакет
index_html = aws.s3.BucketObject("index.html",
bucket=website_bucket.id,
content="""
<!DOCTYPE html>
<html>
<head>
<title>Hello, Pulumi!</title>
</head>
<body>
<h1>Hello, Pulumi!</h1>
<p>This site was deployed using Python and Pulumi.</p>
</body>
</html>
""",
content_type="text/html",
acl="public-read"
)
# Экспортируем URL статического сайта
pulumi.export("website_url", website_bucket.website_endpoint)
Pulumi поддерживает все основные облачные провайдеры и позволяет создавать многооблачные решения с единым подходом.
Apache Airflow: оркестрация облачных задач
Apache Airflow — платформа для программного создания, планирования и мониторинга рабочих процессов. Она отлично подходит для оркестрации облачных задач.
Пример DAG для ежедневного обновления данных в BigQuery:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.google.cloud.operators.bigquery import (
BigQueryCheckOperator,
BigQueryCreateExternalTableOperator,
BigQueryExecuteQueryOperator
)
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
default_args = {
'owner': 'data_team',
'depends_on_past': False,
'email': ['alerts@example.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'daily_data_processing',
default_args=default_args,
description='A DAG to process daily data',
schedule_interval=timedelta(days=1),
start_date=datetime(2023, 1, 1),
catchup=False,
tags=['example'],
) as dag:
# Проверяем, что исходная таблица существует
check_source_table = BigQueryCheckOperator(
task_id='check_source_table',
sql=f'SELECT COUNT(*) FROM `project.dataset.source_table`',
use_legacy_sql=False
)
# Загружаем новые данные из GCS в промежуточную таблицу
load_new_data = GCSToBigQueryOperator(
task_id='load_new_data',
bucket='data-bucket',
source_objects=['daily/{{ ds }}/data.csv'],
destination_project_dataset_table='project.dataset.temp_table',
schema_fields=[
{'name': 'id', 'type': 'STRING', 'mode': 'REQUIRED'},
{'name': 'value', 'type': 'FLOAT', 'mode': 'NULLABLE'},
{'name': 'timestamp', 'type': 'TIMESTAMP', 'mode': 'REQUIRED'}
],
write_disposition='WRITE_TRUNCATE',
skip_leading_rows=1,
allow_quoted_newlines=True
)
# Обрабатываем и объединяем данные
process_data = BigQueryExecuteQueryOperator(
task_id='process_data',
sql="""
MERGE `project.dataset.target_table` T
USING (
SELECT
id,
value,
timestamp
FROM `project.dataset.temp_table`
WHERE timestamp >= '{{ ds }}'
AND timestamp < '{{ tomorrow_ds }}'
) S
ON T.id = S.id AND DATE(T.timestamp) = DATE(S.timestamp)
WHEN MATCHED THEN
UPDATE SET value = S.value, timestamp = S.timestamp
WHEN NOT MATCHED THEN
INSERT (id, value, timestamp) VALUES (id, value, timestamp)
""",
use_legacy_sql=False
)
# Проверяем результаты обработки
validate_results = BigQueryCheckOperator(
task_id='validate_results',
sql=f"SELECT COUNT(*) > 0 FROM `project.dataset.target_table` WHERE DATE(timestamp) = '{{ ds }}'",
use_legacy_sql=False
)
# Определяем порядок выполнения задач
check_source_table >> load_new_data >> process_data >> validate_results
Streamlit: создание интерактивных дашбордов для облачных данных
Streamlit позволяет быстро создавать интерактивные веб-приложения для визуализации облачных данных.
Пример дашборда для анализа данных из AWS S3:
import streamlit as st
import boto3
import pandas as pd
import plotly.express as px
from io import StringIO
# Настройка страницы
st.set_page_config(page_title="Cloud Data Explorer", layout="wide")
st.title("Cloud Data Explorer")
# Боковая панель для настроек
with st.sidebar:
st.header("Настройки подключения")
# Выбор региона
aws_region = st.selectbox("Выберите регион AWS:",
["us-east-1", "us-east-2", "us-west-1", "us-west-2",
"eu-west-1", "eu-central-1", "ap-southeast-1"])
# Ввод данных для подключения
aws_access_key = st.text_input("AWS Access Key ID", type="password")
aws_secret_key = st.text_input("AWS Secret Access Key", type="password")
# Кнопка для подключения
connect_button = st.button("Подключиться")
# Функция для получения списка бакетов
@st.cache_data
def get_buckets(region, access_key, secret_key):
try:
s3 = boto3.client('s3',
region_name=region,
aws_access_key_id=access_key,
aws_secret_access_key=secret_key)
response = s3.list_buckets()
return [bucket['Name'] for bucket in response['Buckets']]
except Exception as e:
st.error(f"Ошибка подключения: {str(e)}")
return []
# Функция для получения объектов из бакета
@st.cache_data
def get_objects(bucket_name, region, access_key, secret_key):
s3 = boto3.client('s3',
region_name=region,
aws_access_key_id=access_key,
aws_secret_access_key=secret_key)
response = s3.list_objects_v2(Bucket=bucket_name)
if 'Contents' in response:
return [obj['Key'] for obj in response['Contents'] if obj['Key'].endswith('.csv')]
return []
# Функция для загрузки данных из CSV
@st.cache_data
def load_data(bucket_name, object_key, region, access_key, secret_key):
s3 = boto3.client('s3',
region_name=region,
aws_access_key_id=access_key,
aws_secret_access_key=secret_key)
obj = s3.get_object(Bucket=bucket_name, Key=object_key)
data = obj['Body'].read().decode('utf-8')
return pd.read_csv(StringIO(data))
# Основная часть приложения
if connect_button:
with st.spinner("Подключение к AWS S3..."):
buckets = get_buckets(aws_region, aws_access_key, aws_secret_key)
if buckets:
st.success(f"Успешно подключено! Найдено {len(buckets)} бакетов.")
# Выбор бакета
selected_bucket = st.selectbox("Выберите бакет:", buckets)
# Получаем список CSV-файлов
csv_files = get_objects(selected_bucket, aws_region, aws_access_key, aws_secret_key)
if csv_files:
selected_file = st.selectbox("Выберите CSV-файл:", csv_files)
# Загружаем данные из выбранного файла
data = load_data(selected_bucket, selected_file, aws_region, aws_access_key, aws_secret_key)
# Отображаем первые строки данных
st.subheader("Предпросмотр данных")
st.dataframe(data.head())
# Статистическая информация
st.subheader("Статистика")
# Две колонки для статистики
col1, col2 = st.columns(2)
with col1:
st.metric("Количество строк", data.shape[0])
st.metric("Количество столбцов", data.shape[1])
with col2:
if data.shape[0] > 0:
# Подсчет пустых значений
missing_values = data.isnull().sum().sum()
st.metric("Пропущенные значения", missing_values)
# Размер данных
memory_usage = data.memory_usage(deep=True).sum()
st.metric("Объем памяти (байт)", memory_usage)
# Визуализация данных
st.subheader("Визуализация")
# Выбор числовых столбцов для визуализации
numeric_columns = data.select_dtypes(include=['int64', 'float64']).columns.tolist()
if numeric_columns:
# Выбор типа графика
chart_type = st.selectbox(
"Выберите тип графика:",
["Гистограмма", "Линейный график", "Диаграмма рассеяния"]
)
if chart_type == "Гистограмма":
column = st.selectbox("Выберите столбец для гистограммы:", numeric_columns)
fig = px.histogram(data, x=column)
st.plotly_chart(fig, use_container_width=True)
elif chart_type == "Линейный график":
x_column = st.selectbox("Выберите столбец для оси X:", data.columns.tolist())
y_column = st.selectbox("Выберите столбец для оси Y:", numeric_columns)
fig = px.line(data, x=x_column, y=y_column)
st.plotly_chart(fig, use_container_width=True)
elif chart_type == "Диаграмма рассеяния":
x_column = st.selectbox("Выберите столбец для оси X:", numeric_columns)
y_column = st.selectbox("Выберите столбец для оси Y:",
[col for col in numeric_columns if col != x_colu