Python в IoT: создание умных устройств на простом языке – тренды
Для кого эта статья:
- Новички в программировании, заинтересованные в IoT и Python
- Опытные разработчики, ищущие новые возможности в IoT
Студенты и специалисты, желающие углубить знания в области разработки IoT-систем на Python
Python покоряет мир IoT стремительно и уверенно! Язык, изначально не предназначавшийся для встраиваемых систем, сегодня властвует в экосистеме умных устройств благодаря своей простоте и мощным библиотекам. Представьте: ваш холодильник, анализирующий содержимое и заказывающий продукты, умный термостат, предсказывающий ваши предпочтения, или система полива, реагирующая на прогноз погоды — всем этим можно управлять несколькими строками Python-кода. Погрузимся в практическое руководство по созданию интеллектуальных IoT-решений с помощью Python! 🐍✨
Хотите стать разработчиком, способным создавать IoT-системы любой сложности? Обучение Python-разработке от Skypro — ваш путь в мир интернета вещей. Наш курс включает специальный модуль по IoT-разработке, где вы создадите собственную умную систему под руководством экспертов отрасли. Выпускники курса уже разрабатывают умные дома, промышленные IoT-решения и носимые устройства. Ваше будущее в самой перспективной технологической нише начинается здесь!
Python и IoT: фундамент для умных устройств
Python и интернет вещей образуют мощный симбиоз, преобразующий наше взаимодействие с окружающими устройствами. Ключевое преимущество Python в контексте IoT — универсальность применения на всех уровнях IoT-архитектуры: от микроконтроллеров до облачной аналитики. Это устраняет необходимость переключаться между языками программирования на разных этапах проекта.
Давайте рассмотрим основные преимущества Python для IoT-разработки:
- Низкий порог входа: синтаксис Python интуитивно понятен даже новичкам, что позволяет быстро освоить базовые концепции программирования IoT-устройств
- Богатая экосистема библиотек: Python предлагает специализированные библиотеки для всех аспектов IoT — от взаимодействия с аппаратным обеспечением до машинного обучения и анализа данных
- Кросс-платформенность: код Python работает на разных операционных системах, что критически важно для гетерогенной среды IoT
- Масштабируемость: от простейших домашних проектов до промышленных систем — Python эффективен на любом уровне сложности
- Интеграция с облачными сервисами: нативная поддержка взаимодействия с AWS, Google Cloud, Azure и другими облачными провайдерами IoT
Чтобы понимать архитектуру типичного IoT-решения на Python, рассмотрим следующую схему взаимодействия компонентов:
| Уровень архитектуры | Компоненты | Python-технологии |
|---|---|---|
| Сенсоры и актуаторы | Физические устройства сбора данных и воздействия на среду | MicroPython, CircuitPython |
| Шлюзы и концентраторы | Устройства агрегации и предварительной обработки данных | Raspberry Pi OS + Python, Pycom |
| Передача данных | Протоколы коммуникации устройств | Paho-MQTT, Requests, WebSockets |
| Облачная обработка | Серверы анализа и хранения данных | Flask, Django, Pandas, TensorFlow |
| Пользовательский интерфейс | Визуализация и управление системой | Dash, Streamlit, Flask + JavaScript |
Представьте обычный термостат. С Python он превращается в умную систему климат-контроля, которая учится у ваших предпочтений, анализирует погодные данные и оптимизирует энергопотребление. Всего несколько сотен строк кода — и вы получаете функциональность, сравнимую с коммерческими продуктами стоимостью в сотни долларов. 🔥
Алексей Кириллов, IoT-инженер и руководитель проектов Однажды мне поручили создать систему мониторинга для теплиц сельскохозяйственного комплекса. Бюджет был ограничен, а сроки — катастрофически сжаты. Я выбрал Python как основу решения, хотя руководство скептически отнеслось к этому выбору, предпочитая C++ для встраиваемых систем.
За две недели мы развернули сеть из 50 датчиков, собирающих данные о температуре, влажности и освещенности. Python с библиотекой Pandas обрабатывал поток информации, а Flask обеспечивал веб-интерфейс для мониторинга. Когда система обнаружила аномальное снижение температуры в одной из теплиц (лопнула труба отопления), она автоматически подняла тревогу и активировала резервные обогреватели.
Это спасло урожай стоимостью более 2 млн рублей. После этого случая скептицизм по отношению к Python в IoT-проектах исчез, и язык стал стандартом для всех новых разработок в компании. Python доказал, что может быть не только прототипом, но и полноценным производственным решением для критически важных IoT-систем.

Настройка среды разработки для IoT-проектов на Python
Правильная настройка среды разработки — это фундамент успешного IoT-проекта. Она должна обеспечивать удобное написание кода, отладку, тестирование и развертывание. Для IoT-разработки на Python требуется специфический набор инструментов, отличающийся от стандартного веб-разработческого стека. 🛠️
Начнем с базовой среды для разработки IoT-проектов на Python:
- Python 3.7+: новейшие версии обеспечивают лучшую производительность и поддержку асинхронного программирования через asyncio, что критически важно для IoT
- IDE с поддержкой удалённой отладки: PyCharm Professional или VS Code с Python-расширениями для работы с удаленными устройствами
- Менеджер виртуальных окружений: venv или conda для изоляции зависимостей проекта
- Git: для контроля версий, особенно важного при коллективной разработке IoT-решений
- Docker: для создания изолированных сред тестирования и упрощения развертывания
Для разных типов IoT-устройств потребуются специфические инструменты. Рассмотрим наиболее популярные варианты:
| Тип устройства | Необходимые инструменты | Команды для установки |
|---|---|---|
| Микроконтроллеры (ESP32, ESP8266) | esptool, ampy, rshell | pip install esptool adafruit-ampy rshell |
| Raspberry Pi | RPi.GPIO, pigpio | pip install RPi.GPIO pigpio |
| Arduino с Python-интерфейсом | pyserial, pyFirmata | pip install pyserial pyfirmata |
| IoT шлюзы | paho-mqtt, AWSIoTPythonSDK | pip install paho-mqtt AWSIoTPythonSDK |
| Симуляция IoT-устройств | mininet, cooja | Требуют отдельной установки |
Пошаговая настройка среды разработки для ESP32 (популярная платформа для IoT):
- Установите Python 3.8+ с официального сайта python.org
- Создайте виртуальное окружение:
python -m venv iot_env - Активируйте окружение:
- Windows:
iot_env\Scripts\activate - Linux/Mac:
source iot_env/bin/activate
- Windows:
- Установите необходимые пакеты:
pip install esptool adafruit-ampy rshell - Загрузите прошивку MicroPython:
esptool.py --port /dev/ttyUSB0 --baud 460800 write_flash -z 0x1000 esp32-20210902-v1.17.bin - Проверьте соединение:
ampy --port /dev/ttyUSB0 ls - Настройте IDE для удобной разработки и отладки
Для профессиональной разработки рекомендую настроить систему непрерывной интеграции (CI/CD) даже для небольших IoT-проектов. Автоматические тесты, проверяющие код перед отправкой на устройства, помогут избежать ситуаций, когда ошибка в коде обнаруживается уже после физического развертывания на десятках устройств.
Отладка IoT-систем имеет свою специфику. В отличие от веб-разработки, здесь вы часто не можете просто подключить отладчик к устройству. Полезно настроить систему логирования, которая будет отправлять данные на центральный сервер. Для этих целей отлично подходят инструменты как ELK Stack (Elasticsearch, Logstash, Kibana) или более легковесный вариант — связка MQTT + InfluxDB + Grafana.
Библиотеки MicroPython: управление устройствами с нуля
MicroPython — это компактная реализация Python 3, оптимизированная для работы на микроконтроллерах и в ограниченных средах. Он позволяет писать элегантный Python-код для управления устройствами, которые традиционно программировались на низкоуровневых языках. Это революционное решение для IoT-разработки, соединяющее простоту Python с возможностями встроенных систем. 💡
Ключевые библиотеки MicroPython для IoT-разработки:
- machine: базовый модуль для взаимодействия с аппаратными интерфейсами (GPIO, I2C, SPI, UART)
- network: управление Wi-Fi и другими сетевыми подключениями
- umqtt.simple: облегченная реализация MQTT-клиента для IoT-коммуникации
- utime: функции для работы со временем и таймерами
- ujson: обработка JSON-данных для обмена информацией
Рассмотрим практический пример управления светодиодом с помощью MicroPython на ESP32:
# Базовое управление светодиодом
from machine import Pin
import utime
# Настройка GPIO2 как выходной пин
led = Pin(2, Pin.OUT)
# Функция мигания с заданной частотой
def blink(times, interval=0.5):
for _ in range(times):
led.on()
utime.sleep(interval)
led.off()
utime.sleep(interval)
# Мигаем 5 раз с интервалом 0.2 секунды
blink(5, 0.2)
Теперь усложним задачу и создадим код для считывания данных с датчика температуры DHT22 и отправки их через MQTT:
import dht
import machine
import network
import utime
from umqtt.simple import MQTTClient
# Настройка датчика DHT22 на пине 4
sensor = dht.DHT22(machine.Pin(4))
# Подключение к Wi-Fi
def connect_wifi(ssid, password):
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
if not wlan.isconnected():
print('Подключение к Wi-Fi...')
wlan.connect(ssid, password)
while not wlan.isconnected():
utime.sleep(1)
print('Подключено к WiFi:', wlan.ifconfig())
return wlan
# Подключение к MQTT-брокеру
def connect_mqtt(client_id, mqtt_server, user=None, password=None):
client = MQTTClient(client_id, mqtt_server, user=user, password=password)
client.connect()
print('Подключено к MQTT-брокеру')
return client
# Основная логика работы устройства
def main():
# Настройка подключений
connect_wifi('YOUR_WIFI_SSID', 'YOUR_WIFI_PASSWORD')
mqtt = connect_mqtt('temp_sensor1', 'mqtt.example.com')
# Основной цикл
try:
while True:
try:
# Считывание данных с датчика
sensor.measure()
temp = sensor.temperature()
hum = sensor.humidity()
# Формирование сообщения
msg = f'{{"temperature": {temp}, "humidity": {hum}}}'
print('Отправка:', msg)
# Отправка в MQTT
mqtt.publish(b'sensors/dht22', msg.encode())
except Exception as e:
print('Ошибка:', e)
# Пауза между измерениями
utime.sleep(60)
except KeyboardInterrupt:
print('Остановка...')
finally:
# Корректное завершение работы
mqtt.disconnect()
# Запуск программы
if __name__ == '__main__':
main()
При разработке с использованием MicroPython следует помнить о следующих особенностях и лучших практиках:
- Ограничения памяти: оптимизируйте код, избегайте больших строк и сложных структур данных
- Энергосбережение: используйте режимы глубокого сна (deep sleep) для устройств на батареях
- Обработка ошибок: всегда предусматривайте механизмы восстановления и повторного подключения
- Безопасность: внедряйте шифрование и аутентификацию даже в небольших проектах
- Модульность: разделяйте код на логические блоки для упрощения обновлений
Михаил Сергеев, разработчик встраиваемых систем В прошлом году мы столкнулись с проблемой при разработке системы мониторинга для промышленного холодильного оборудования. Клиент требовал быстрого развертывания на 200 единицах техники в разных городах. Традиционно мы использовали C++ для подобных проектов, но сроки были настолько сжатыми, что потребовался новый подход.
Решение пришло неожиданно: MicroPython на ESP32. Изначально команда сопротивлялась — «Python слишком медленный для промышленных систем», говорили коллеги. Но я настоял на пилотном внедрении.
Мы разработали прототип за 2 дня вместо планируемых 2 недель! Система собирала данные с температурных датчиков, анализировала их и отправляла предупреждения при отклонениях. Код получился в 4 раза короче, чем эквивалентный на C++.
Самым впечатляющим стало удаленное обновление. Когда в коде обнаружилась ошибка (некорректный расчет среднего значения температуры), мы смогли обновить все устройства по воздуху, просто отправив новый Python-скрипт. С C++ пришлось бы перепрошивать каждое устройство, что означало бы командировки специалистов в разные города.
Проект был сдан на неделю раньше срока, а клиент получил бонус — веб-интерфейс для мониторинга, который мы успели разработать благодаря сэкономленному времени. С тех пор MicroPython стал нашим стандартным инструментом для 70% проектов IoT.
Сбор и обработка данных с IoT-сенсоров в реальном времени
Сбор и анализ данных — сердце любого IoT-проекта. Python предоставляет мощные инструменты для работы с потоками сенсорных данных, их обработки, фильтрации и визуализации в режиме реального времени. Эффективная работа с данными позволяет превратить простой мониторинг в интеллектуальную систему, способную принимать решения автономно. 📊
Рассмотрим основные этапы обработки данных с IoT-устройств и соответствующие Python-библиотеки:
- Сбор данных: интерфейсные библиотеки для различных сенсоров (Adafruit_CircuitPython, gpiozero)
- Передача данных: протокольные библиотеки (paho-mqtt, requests, websockets)
- Хранение данных: базы данных временных рядов (influxdb-python, prometheus-client)
- Обработка потоков: потоковая аналитика (Apache Kafka с confluent-kafka, RxPY)
- Анализ и машинное обучение: аналитические инструменты (NumPy, pandas, scikit-learn)
- Визуализация: интерактивные дашборды (Dash, Bokeh, Streamlit)
Практический пример сбора и обработки данных с температурного сенсора с использованием MQTT и InfluxDB:
# На стороне IoT-устройства
import time
import random
import paho.mqtt.client as mqtt
import json
# Имитация сенсора температуры
def get_sensor_data():
return {
"temperature": 25 + random.uniform(-5, 5),
"humidity": 60 + random.uniform(-10, 10),
"timestamp": int(time.time())
}
# Настройка MQTT-клиента
client = mqtt.Client("temperature_sensor")
client.connect("mqtt.example.com", 1883, 60)
# Отправка данных каждые 5 секунд
try:
while True:
data = get_sensor_data()
client.publish("sensors/temperature", json.dumps(data))
print(f"Отправлено: {data}")
time.sleep(5)
except KeyboardInterrupt:
client.disconnect()
print("Соединение закрыто")
# На серверной стороне
import paho.mqtt.client as mqtt
from influxdb import InfluxDBClient
import json
from datetime import datetime
import numpy as np
# Буфер для хранения последних 10 измерений
temperature_buffer = []
MAX_BUFFER_SIZE = 10
# Соединение с InfluxDB
influx_client = InfluxDBClient('localhost', 8086, 'admin', 'password', 'iot_sensors')
influx_client.create_database('iot_sensors')
# Обработчики MQTT-событий
def on_connect(client, userdata, flags, rc):
print(f"Подключено к MQTT-брокеру с кодом: {rc}")
client.subscribe("sensors/temperature")
def on_message(client, userdata, msg):
try:
# Декодирование полученных данных
payload = json.loads(msg.payload)
temperature = float(payload["temperature"])
timestamp = payload.get("timestamp", int(datetime.now().timestamp()))
# Добавление в буфер для скользящего среднего
temperature_buffer.append(temperature)
if len(temperature_buffer) > MAX_BUFFER_SIZE:
temperature_buffer.pop(0)
# Расчёт скользящего среднего и стандартного отклонения
avg_temp = np.mean(temperature_buffer)
std_temp = np.std(temperature_buffer)
# Детекция аномалий (z-score > 2)
is_anomaly = abs(temperature – avg_temp) > 2 * std_temp
# Подготовка данных для InfluxDB
json_body = [
{
"measurement": "temperature",
"tags": {
"sensor_id": msg.topic.split('/')[-1],
"is_anomaly": str(is_anomaly)
},
"time": datetime.fromtimestamp(timestamp).isoformat(),
"fields": {
"value": temperature,
"moving_avg": avg_temp,
"std_dev": std_temp
}
}
]
# Запись в базу данных
influx_client.write_points(json_body)
# Вывод информации
print(f"Получено: {temperature}°C, Среднее: {avg_temp:.2f}°C, Аномалия: {is_anomaly}")
# Реакция на аномалии
if is_anomaly:
# Здесь может быть код для оповещения или автоматической реакции
client.publish("alerts/temperature", json.dumps({
"sensor": msg.topic.split('/')[-1],
"value": temperature,
"threshold": avg_temp + 2 * std_temp,
"timestamp": timestamp
}))
except Exception as e:
print(f"Ошибка обработки сообщения: {e}")
# Настройка MQTT-клиента
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
# Подключение к брокеру
client.connect("mqtt.example.com", 1883, 60)
# Запуск цикла обработки
client.loop_forever()
При работе с данными IoT важно учитывать следующие аспекты:
| Проблема | Решение с использованием Python |
|---|---|
| Неполные или пропущенные данные | Методы заполнения из pandas (interpolate, fillna) или прогнозирование с ARIMA |
| Шумы и выбросы | Фильтры Калмана (pykalman) или медианные фильтры (scipy.signal) |
| Разная частота поступления данных | Ресемплинг с помощью pandas.resample() |
| Большие объемы данных | Потоковая обработка с Dask или Apache Spark (PySpark) |
| Предиктивная аналитика | Машинное обучение с scikit-learn, Prophet или TensorFlow |
Для визуализации данных в реальном времени Python предлагает несколько мощных фреймворков. Один из самых популярных — Dash от Plotly, который позволяет создавать интерактивные аналитические дашборды с минимальным количеством кода:
# Простой дашборд для визуализации температурных данных
import dash
from dash import dcc, html
from dash.dependencies import Input, Output
import plotly.graph_objs as go
from influxdb import InfluxDBClient
import pandas as pd
# Соединение с базой данных
influx_client = InfluxDBClient('localhost', 8086, 'admin', 'password', 'iot_sensors')
# Создание приложения Dash
app = dash.Dash(__name__)
# Макет дашборда
app.layout = html.Div([
html.H1("Мониторинг температуры в реальном времени"),
dcc.Graph(id='live-temperature-graph'),
dcc.Interval(
id='interval-component',
interval=5*1000, # в миллисекундах (обновление каждые 5 секунд)
n_intervals=0
)
])
# Функция обновления графика
@app.callback(
Output('live-temperature-graph', 'figure'),
[Input('interval-component', 'n_intervals')]
)
def update_graph(n):
# Запрос последних данных из InfluxDB
query = 'SELECT time, value, moving_avg FROM temperature WHERE time > now() – 1h'
result = influx_client.query(query)
# Преобразование в DataFrame
points = list(result.get_points())
df = pd.DataFrame(points)
if not df.empty:
df['time'] = pd.to_datetime(df['time'])
# Создание графика
figure = {
'data': [
go.Scatter(
x=df['time'],
y=df['value'],
name='Температура',
mode='lines+markers',
marker=dict(color='red')
),
go.Scatter(
x=df['time'],
y=df['moving_avg'],
name='Скользящее среднее',
mode='lines',
line=dict(color='blue', dash='dash')
)
],
'layout': go.Layout(
title='Температурные показатели за последний час',
xaxis=dict(title='Время'),
yaxis=dict(title='Температура (°C)'),
hovermode='closest'
)
}
else:
figure = {
'data': [],
'layout': go.Layout(
title='Нет данных за последний час'
)
}
return figure
if __name__ == '__main__':
app.run_server(debug=True)
Используя эти инструменты и подходы, вы можете создать полноценную систему сбора и анализа данных с IoT-устройств. Python делает этот процесс доступным даже для разработчиков без глубоких знаний в области анализа данных, предоставляя интуитивно понятные API и обширную документацию. Комбинирование IoT-сенсоров с алгоритмами машинного обучения открывает путь к созданию по-настоящему интеллектуальных систем, способных не только собирать данные, но и принимать на их основе автономные решения. 🤖
Создание полноценного IoT-проекта: от концепции до реализации
Создание комплексного IoT-проекта требует не только технических знаний, но и методического подхода к проектированию. Рассмотрим процесс разработки полноценной IoT-системы на примере умной теплицы — проекта, объединяющего различные аспекты IoT-разработки на Python. 🌱
Этапы разработки IoT-проекта:
- Определение требований и проектирование системы
- Выбор аппаратных компонентов и технологий
- Разработка программного обеспечения для устройств
- Создание серверной части и базы данных
- Разработка пользовательского интерфейса
- Тестирование и отладка системы
- Развертывание и масштабирование решения
Для нашего примера создадим умную теплицу с автоматическим контролем климата, системой полива и удаленным мониторингом. Архитектура системы будет включать следующие компоненты:
- Сенсорный узел: ESP32 с MicroPython, датчики температуры, влажности почвы и воздуха, освещенности
- Актуаторы: системы полива, вентиляции, освещения и обогрева
- Шлюз: Raspberry Pi, агрегирующий данные и обеспечивающий локальное управление
- Облачный сервер: Flask API, база данных InfluxDB, алгоритмы предсказания
- Пользовательский интерфейс: веб-приложение и мобильное приложение
Рассмотрим ключевой компонент — код для контроллера теплицы на ESP32 с MicroPython:
# greenhouse_controller.py
import time
import machine
import dht
import network
from umqtt.robust import MQTTClient
import ujson as json
# Настройка пинов
DHT_PIN = 15
SOIL_MOISTURE_PIN = 32
LIGHT_SENSOR_PIN = 33
PUMP_PIN = 25
FAN_PIN = 26
LIGHT_PIN = 27
HEATER_PIN = 14
# Инициализация сенсоров
dht_sensor = dht.DHT22(machine.Pin(DHT_PIN))
soil_sensor = machine.ADC(machine.Pin(SOIL_MOISTURE_PIN))
soil_sensor.atten(machine.ADC.ATTN_11DB) # Полный диапазон: 3.3V
light_sensor = machine.ADC(machine.Pin(LIGHT_SENSOR_PIN))
light_sensor.atten(machine.ADC.ATTN_11DB)
# Инициализация актуаторов
pump = machine.Pin(PUMP_PIN, machine.Pin.OUT)
fan = machine.Pin(FAN_PIN, machine.Pin.OUT)
light = machine.Pin(LIGHT_PIN, machine.Pin.OUT)
heater = machine.Pin(HEATER_PIN, machine.Pin.OUT)
# Идентификатор устройства
DEVICE_ID = "greenhouse_1"
# Настройки MQTT
MQTT_HOST = "192.168.1.10"
MQTT_PORT = 1883
MQTT_USER = "greenhouse"
MQTT_PASSWORD = "secret"
MQTT_TOPIC_TELEMETRY = f"devices/{DEVICE_ID}/telemetry"
MQTT_TOPIC_CONTROL = f"devices/{DEVICE_ID}/control"
# Настройки Wi-Fi
WIFI_SSID = "YOUR_WIFI_SSID"
WIFI_PASSWORD = "YOUR_WIFI_PASSWORD"
# Функция подключения к Wi-Fi
def connect_wifi():
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
if not wlan.isconnected():
print(f"Connecting to {WIFI_SSID}...")
wlan.connect(WIFI_SSID, WIFI_PASSWORD)
max_wait = 10
while max_wait > 0:
if wlan.isconnected():
break
max_wait -= 1
time.sleep(1)
if wlan.isconnected():
print(f"Connected to {WIFI_SSID}")
print(f"IP: {wlan.ifconfig()[0]}")
else:
print(f"Failed to connect to {WIFI_SSID}")
machine.reset() # Перезагружаемся при неудаче
return wlan
# Функция для чтения данных с сенсоров
def read_sensors():
try:
# Чтение DHT22
dht_sensor.measure()
temperature = dht_sensor.temperature()
humidity = dht_sensor.humidity()
# Чтение влажности почвы (преобразование в проценты)
soil_value = soil_sensor.read()
soil_moisture = 100 – (soil_value / 4095 * 100)
# Чтение освещенности (преобразование в проценты)
light_value = light_sensor.read()
light_level = light_value / 4095 * 100
return {
"temperature": temperature,
"humidity": humidity,
"soil_moisture": soil_moisture,
"light_level": light_level,
"timestamp": time.time()
}
except Exception as e:
print(f"Error reading sensors: {e}")
return None
# Обработчик MQTT-сообщений для управления актуаторами
def on_control_message(topic, message):
try:
control = json.loads(message)
# Управление насосом
if "pump" in control:
pump.value(1 if control["pump"] else 0)
print(f"Pump set to: {control['pump']}")
# Управление вентилятором
if "fan" in control:
fan.value(1 if control["fan"] else 0)
print(f"Fan set to: {control['fan']}")
# Управление освещением
if "light" in control:
light.value(1 if control["light"] else 0)
print(f"Light set to: {control['light']}")
# Управление обогревателем
if "heater" in control:
heater.value(1 if control["heater"] else 0)
print(f"Heater set to: {control['heater']}")
# Подтверждение получения команды
client.publish(
f"devices/{DEVICE_ID}/status",
json.dumps({"received": control, "status": "ok"})
)
except Exception as e:
print(f"Error processing control message: {e}")
client.publish(
f"devices/{DEVICE_ID}/status",
json.dumps({"error": str(e)})
)
# Инициализация MQTT-клиента
def init_mqtt():
global client
# Создание клиента
client = MQTTClient(
DEVICE_ID,
MQTT_HOST,
port=MQTT_PORT,
user=MQTT_USER,
password=MQTT_PASSWORD,
keepalive=30
)
# Настройка обратного вызова
client.set_callback(on_control_message)
# Подключение
try:
client.connect()
print(f"Connected to MQTT broker at {MQTT_HOST}")
# Подписка на топик управления
client.subscribe(MQTT_TOPIC_CONTROL)
print(f"Subscribed to {MQTT_TOPIC_CONTROL}")
return client
except Exception as e:
print(f"Failed to connect to MQTT: {e}")
time.sleep(10)
machine.reset()
# Автоматическое управление теплицей
def auto_control(sensor_data):
# Алгоритм управления поливом
if sensor_data["soil_moisture"] < 30: # Если почва слишком сухая
pump.value(1) # Включаем полив
print("Auto: Pump ON (soil too dry)")
elif sensor_data["soil_moisture"] > 70: # Если почва достаточно влажная
pump.value(0) # Выключаем полив
print("Auto: Pump OFF (soil moist enough)")
# Алгоритм управления вентиляцией
if sensor_data["humidity"] > 85: # Если влажность слишком высокая
fan.value(1) # Включаем вентиляцию
print("Auto: Fan ON (humidity too high)")
elif sensor_data["humidity"] < 60: # Если влажность нормализовалась
fan.value(0) # Выключаем вентиляцию
print("Auto: Fan OFF (humidity normal)")
# Алгоритм управления освещением
if sensor_data["light_level"] < 20 and 6 <= time.localtime()[3] < 20: # Если темно в дневное время
light.value(1) # Включаем освещение
print("Auto: Light ON (too dark during day)")
elif sensor_data["light_level"] > 50 or time.localtime()[3] >= 20 or time.localtime()[3] < 6:
light.value(0) # Выключаем освещение ночью или при достаточной освещенности
print("Auto: Light OFF (night time or bright enough)")
# Алгоритм управления обогревом
if sensor_data["temperature"] < 18: # Если слишком холодно
heater.value(1) # Включаем обогрев
print("Auto: Heater ON (too cold)")
elif sensor_data["temperature"] > 25: # Если достаточно тепло
heater.value(0) # Выключаем обогрев
print("Auto: Heater OFF (warm enough)")
# Главная функция
def main():
# Подключение к Wi-Fi
wlan = connect_wifi()
# Инициализация MQTT
client = init_mqtt()
# Главный цикл
last_telemetry_time = 0
TELEMETRY_INTERVAL = 60 # Интервал отправки телеметрии (в секундах)
try:
while True:
# Проверка наличия входящих сообщений
client.check_msg()
# Чтение и отправка телеметрии с заданным интервалом
current_time = time.time()
if current_time – last_telemetry_time >= TELEMETRY_INTERVAL:
sensor_data = read_sensors()
if sensor_data:
# Отправка телеметрии
client.publish(MQTT_TOPIC_TELEMETRY, json.dumps(sensor_data))
print(f"Telemetry sent: {sensor_data}")
# Автоматическое управление
auto_control(sensor_data)
last_telemetry_time = current_time
# Короткая пауза для экономии ресурсов
time.sleep(1)
except Exception as e:
print(f"Runtime error: {e}")
time.sleep(10)
machine.reset()
# Запуск программы
if __name__ == "__main__":
main()
На стороне сервера реализуем Flask API для приема и обработки данных:
# server/app.py
from flask import Flask, request, jsonify
from influxdb import InfluxDBClient
import datetime
import pandas as pd
from sklearn.ensemble import RandomForestRegressor
import schedule
import threading
import time
app = Flask(__name__)
# Подключение к InfluxDB
influx_client = InfluxDBClient(host='localhost', port=8086)
influx_client.switch_database('greenhouse_db')
# Модель машинного обучения для предсказания
model = None
# Функция для обучения модели предсказания на исторических данных
def train_prediction_model():
global model
# Запрос исторических данных за последнюю неделю
query = 'SELECT * FROM temperature,humidity,soil_moisture,light_level WHERE time > now() – 7d'
result = influx_client.query(query)
# Преобразование в DataFrame
data = []
for point in result.get_points():
data.append(point)
if not data:
print("No historical data available for training")
return
df = pd.DataFrame(data)
df['time'] = pd.to_datetime(df['time'])
# Извлечение признаков для машинного обучения
df['hour'] = df['time'].dt.hour
df['day_of_week'] = df['time'].dt.dayofweek
# Подготовка данных для обучения
X = df[['hour', 'day_of_week', 'humidity', 'light_level']]
y = df['temperature'] # Предсказываем температуру
# Обучение модели
model = RandomForestRegressor(n_estimators=100, random_state=42)
model.fit(X, y)
print("Prediction model trained successfully")
# Планирование ежедневного переобучения модели
def schedule_model_training():
schedule.every().day.at("02:00").do(train_prediction_model)
while True:
schedule.run_pending()
time.sleep(60)
# API-конечная точка для приема телеметрии
@app.route('/api/telemetry', methods=['POST'])
def receive_telemetry():
data = request.json
if not data or 'device_id' not in data:
return jsonify({"error": "Invalid data format"}), 400
device_id = data['device_id']
telemetry = data['telemetry']
timestamp = datetime.datetime.utcnow().isoformat()
# Подготовка данных для InfluxDB
points = []
for key, value in telemetry.items():
if key != 'timestamp': # Исключаем timestamp из устройства
point = {
"measurement": key,
"tags": {
"device_id": device_id
},
"time": timestamp,
"fields": {
"value": float(value)
}
}
points.append(point)
# Запись в InfluxDB
if points:
influx_client.write_points(points)
return jsonify({"status": "ok"})
# API-конечная точка для отправки управляющих команд
@app.route('/api/control/<device_id>', methods=['POST'])
def send_control(device_id):
# Здесь должна быть логика для отправки команд через MQTT
# В реальном приложении это будет интегрировано с MQTT-брокером
return jsonify({"status": "command sent", "device_id": device_id})
# API-конечная точка для получения предсказаний
@app.route('/api/predict/<device_id>', methods=['GET'])
def get_prediction(device_id):
if model is None:
return jsonify({"error": "Prediction model not trained yet"}), 503
# Получение последних данных для устройства
query = f'SELECT last("value") FROM humidity,light_level WHERE "device_id" = \'{device_id}\' GROUP BY "measurement"'
result = influx_client.query(query)
# Подготовка данных для предсказания
current_hour = datetime.datetime.now().hour
current_day = datetime.datetime.now().weekday()
humidity = None
light_level = None
for point in result.get_points(measurement='humidity'):
humidity = point['last']
for point in result.get_points(measurement='light_level'):
light_level = point['last']
if humidity is None or light_level is None:
return jsonify({"error": "Insufficient data for prediction"}), 400
# Выполнение предсказания
prediction = model.predict([[current_hour, current_day, humidity, light_level]])
return jsonify({
"device_id": device_id,
"predicted_temperature": float(prediction[0]),
"timestamp": datetime.datetime.now().isoformat()
})
if __name__ == '__main__':
# Запуск обучения модели в отдельном потоке
threading.Thread(target=schedule_model_training, daemon=True).start()
# Первоначальное обучение модели
train_prediction_model()
# Запуск Flask-сервера
app.run(host='0.0.0.0', port=5000)
Для завершения проекта необходимо создать интерфейс пользователя. Можно использовать Dash или React для веб-интерфейса. Вот пример простого дашборда на Dash:
# dashboard/app.py
import dash
from dash import dcc, html
from dash.dependencies import Input, Output, State
import plotly.graph_objs as go
from influxdb import InfluxDBClient
import pandas as pd
import requests
import datetime
# Подключение к InfluxDB
influx_client = InfluxDBClient(host='localhost', port=8086)
influx_client.switch_database('greenhouse_db')
# Создание приложения Dash
app = dash.Dash(__name__, meta_tags=[
{"name": "viewport", "content": "width=device-width, initial-scale=1"}
])
# Макет дашборда
app.layout = html.Div([
html.H1("Умная теплица: Система мониторинга и управления"),
html.Div([
html.Div([
html.H3("Выбор устройства"),
dcc.Dropdown(
id='device-selector',
options=[
{'label': 'Теплица 1', 'value': 'greenhouse_1'},
{'label': 'Теплица 2', 'value': 'greenhouse_2'}
],
value='greenhouse_1'
)
], className="four columns"),
html.Div([
html.H3("Период времени"),
dcc.RadioItems(
id='time-range',
options=[
{'label': 'Последний час', 'value': '1h'},
{'label': 'Последний день', 'value': '1d'},
{'label': 'Последняя неделя', 'value': '7d'}
],
value='1h'
)
], className="four columns"),
html.Div([
html.H3("Текущий статус