Python в IoT: создание умных устройств на простом языке – тренды

Пройдите тест, узнайте какой профессии подходите
Сколько вам лет
0%
До 18
От 18 до 24
От 25 до 34
От 35 до 44
От 45 до 49
От 50 до 54
Больше 55

Для кого эта статья:

  • Новички в программировании, заинтересованные в IoT и Python
  • Опытные разработчики, ищущие новые возможности в IoT
  • Студенты и специалисты, желающие углубить знания в области разработки IoT-систем на Python

    Python покоряет мир IoT стремительно и уверенно! Язык, изначально не предназначавшийся для встраиваемых систем, сегодня властвует в экосистеме умных устройств благодаря своей простоте и мощным библиотекам. Представьте: ваш холодильник, анализирующий содержимое и заказывающий продукты, умный термостат, предсказывающий ваши предпочтения, или система полива, реагирующая на прогноз погоды — всем этим можно управлять несколькими строками Python-кода. Погрузимся в практическое руководство по созданию интеллектуальных IoT-решений с помощью Python! 🐍✨

Хотите стать разработчиком, способным создавать IoT-системы любой сложности? Обучение Python-разработке от Skypro — ваш путь в мир интернета вещей. Наш курс включает специальный модуль по IoT-разработке, где вы создадите собственную умную систему под руководством экспертов отрасли. Выпускники курса уже разрабатывают умные дома, промышленные IoT-решения и носимые устройства. Ваше будущее в самой перспективной технологической нише начинается здесь!

Python и IoT: фундамент для умных устройств

Python и интернет вещей образуют мощный симбиоз, преобразующий наше взаимодействие с окружающими устройствами. Ключевое преимущество Python в контексте IoT — универсальность применения на всех уровнях IoT-архитектуры: от микроконтроллеров до облачной аналитики. Это устраняет необходимость переключаться между языками программирования на разных этапах проекта.

Давайте рассмотрим основные преимущества Python для IoT-разработки:

  • Низкий порог входа: синтаксис Python интуитивно понятен даже новичкам, что позволяет быстро освоить базовые концепции программирования IoT-устройств
  • Богатая экосистема библиотек: Python предлагает специализированные библиотеки для всех аспектов IoT — от взаимодействия с аппаратным обеспечением до машинного обучения и анализа данных
  • Кросс-платформенность: код Python работает на разных операционных системах, что критически важно для гетерогенной среды IoT
  • Масштабируемость: от простейших домашних проектов до промышленных систем — Python эффективен на любом уровне сложности
  • Интеграция с облачными сервисами: нативная поддержка взаимодействия с AWS, Google Cloud, Azure и другими облачными провайдерами IoT

Чтобы понимать архитектуру типичного IoT-решения на Python, рассмотрим следующую схему взаимодействия компонентов:

Уровень архитектуры Компоненты Python-технологии
Сенсоры и актуаторы Физические устройства сбора данных и воздействия на среду MicroPython, CircuitPython
Шлюзы и концентраторы Устройства агрегации и предварительной обработки данных Raspberry Pi OS + Python, Pycom
Передача данных Протоколы коммуникации устройств Paho-MQTT, Requests, WebSockets
Облачная обработка Серверы анализа и хранения данных Flask, Django, Pandas, TensorFlow
Пользовательский интерфейс Визуализация и управление системой Dash, Streamlit, Flask + JavaScript

Представьте обычный термостат. С Python он превращается в умную систему климат-контроля, которая учится у ваших предпочтений, анализирует погодные данные и оптимизирует энергопотребление. Всего несколько сотен строк кода — и вы получаете функциональность, сравнимую с коммерческими продуктами стоимостью в сотни долларов. 🔥

Алексей Кириллов, IoT-инженер и руководитель проектов Однажды мне поручили создать систему мониторинга для теплиц сельскохозяйственного комплекса. Бюджет был ограничен, а сроки — катастрофически сжаты. Я выбрал Python как основу решения, хотя руководство скептически отнеслось к этому выбору, предпочитая C++ для встраиваемых систем.

За две недели мы развернули сеть из 50 датчиков, собирающих данные о температуре, влажности и освещенности. Python с библиотекой Pandas обрабатывал поток информации, а Flask обеспечивал веб-интерфейс для мониторинга. Когда система обнаружила аномальное снижение температуры в одной из теплиц (лопнула труба отопления), она автоматически подняла тревогу и активировала резервные обогреватели.

Это спасло урожай стоимостью более 2 млн рублей. После этого случая скептицизм по отношению к Python в IoT-проектах исчез, и язык стал стандартом для всех новых разработок в компании. Python доказал, что может быть не только прототипом, но и полноценным производственным решением для критически важных IoT-систем.

Пошаговый план для смены профессии

Настройка среды разработки для IoT-проектов на Python

Правильная настройка среды разработки — это фундамент успешного IoT-проекта. Она должна обеспечивать удобное написание кода, отладку, тестирование и развертывание. Для IoT-разработки на Python требуется специфический набор инструментов, отличающийся от стандартного веб-разработческого стека. 🛠️

Начнем с базовой среды для разработки IoT-проектов на Python:

  • Python 3.7+: новейшие версии обеспечивают лучшую производительность и поддержку асинхронного программирования через asyncio, что критически важно для IoT
  • IDE с поддержкой удалённой отладки: PyCharm Professional или VS Code с Python-расширениями для работы с удаленными устройствами
  • Менеджер виртуальных окружений: venv или conda для изоляции зависимостей проекта
  • Git: для контроля версий, особенно важного при коллективной разработке IoT-решений
  • Docker: для создания изолированных сред тестирования и упрощения развертывания

Для разных типов IoT-устройств потребуются специфические инструменты. Рассмотрим наиболее популярные варианты:

Тип устройства Необходимые инструменты Команды для установки
Микроконтроллеры (ESP32, ESP8266) esptool, ampy, rshell pip install esptool adafruit-ampy rshell
Raspberry Pi RPi.GPIO, pigpio pip install RPi.GPIO pigpio
Arduino с Python-интерфейсом pyserial, pyFirmata pip install pyserial pyfirmata
IoT шлюзы paho-mqtt, AWSIoTPythonSDK pip install paho-mqtt AWSIoTPythonSDK
Симуляция IoT-устройств mininet, cooja Требуют отдельной установки

Пошаговая настройка среды разработки для ESP32 (популярная платформа для IoT):

  1. Установите Python 3.8+ с официального сайта python.org
  2. Создайте виртуальное окружение: python -m venv iot_env
  3. Активируйте окружение:
    • Windows: iot_env\Scripts\activate
    • Linux/Mac: source iot_env/bin/activate
  4. Установите необходимые пакеты: pip install esptool adafruit-ampy rshell
  5. Загрузите прошивку MicroPython: esptool.py --port /dev/ttyUSB0 --baud 460800 write_flash -z 0x1000 esp32-20210902-v1.17.bin
  6. Проверьте соединение: ampy --port /dev/ttyUSB0 ls
  7. Настройте IDE для удобной разработки и отладки

Для профессиональной разработки рекомендую настроить систему непрерывной интеграции (CI/CD) даже для небольших IoT-проектов. Автоматические тесты, проверяющие код перед отправкой на устройства, помогут избежать ситуаций, когда ошибка в коде обнаруживается уже после физического развертывания на десятках устройств.

Отладка IoT-систем имеет свою специфику. В отличие от веб-разработки, здесь вы часто не можете просто подключить отладчик к устройству. Полезно настроить систему логирования, которая будет отправлять данные на центральный сервер. Для этих целей отлично подходят инструменты как ELK Stack (Elasticsearch, Logstash, Kibana) или более легковесный вариант — связка MQTT + InfluxDB + Grafana.

Библиотеки MicroPython: управление устройствами с нуля

MicroPython — это компактная реализация Python 3, оптимизированная для работы на микроконтроллерах и в ограниченных средах. Он позволяет писать элегантный Python-код для управления устройствами, которые традиционно программировались на низкоуровневых языках. Это революционное решение для IoT-разработки, соединяющее простоту Python с возможностями встроенных систем. 💡

Ключевые библиотеки MicroPython для IoT-разработки:

  • machine: базовый модуль для взаимодействия с аппаратными интерфейсами (GPIO, I2C, SPI, UART)
  • network: управление Wi-Fi и другими сетевыми подключениями
  • umqtt.simple: облегченная реализация MQTT-клиента для IoT-коммуникации
  • utime: функции для работы со временем и таймерами
  • ujson: обработка JSON-данных для обмена информацией

Рассмотрим практический пример управления светодиодом с помощью MicroPython на ESP32:

Python
Скопировать код
# Базовое управление светодиодом
from machine import Pin
import utime

# Настройка GPIO2 как выходной пин
led = Pin(2, Pin.OUT)

# Функция мигания с заданной частотой
def blink(times, interval=0.5):
for _ in range(times):
led.on()
utime.sleep(interval)
led.off()
utime.sleep(interval)

# Мигаем 5 раз с интервалом 0.2 секунды
blink(5, 0.2)

Теперь усложним задачу и создадим код для считывания данных с датчика температуры DHT22 и отправки их через MQTT:

Python
Скопировать код
import dht
import machine
import network
import utime
from umqtt.simple import MQTTClient

# Настройка датчика DHT22 на пине 4
sensor = dht.DHT22(machine.Pin(4))

# Подключение к Wi-Fi
def connect_wifi(ssid, password):
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
if not wlan.isconnected():
print('Подключение к Wi-Fi...')
wlan.connect(ssid, password)
while not wlan.isconnected():
utime.sleep(1)
print('Подключено к WiFi:', wlan.ifconfig())
return wlan

# Подключение к MQTT-брокеру
def connect_mqtt(client_id, mqtt_server, user=None, password=None):
client = MQTTClient(client_id, mqtt_server, user=user, password=password)
client.connect()
print('Подключено к MQTT-брокеру')
return client

# Основная логика работы устройства
def main():
# Настройка подключений
connect_wifi('YOUR_WIFI_SSID', 'YOUR_WIFI_PASSWORD')
mqtt = connect_mqtt('temp_sensor1', 'mqtt.example.com')

# Основной цикл
try:
while True:
try:
# Считывание данных с датчика
sensor.measure()
temp = sensor.temperature()
hum = sensor.humidity()

# Формирование сообщения
msg = f'{{"temperature": {temp}, "humidity": {hum}}}'
print('Отправка:', msg)

# Отправка в MQTT
mqtt.publish(b'sensors/dht22', msg.encode())

except Exception as e:
print('Ошибка:', e)

# Пауза между измерениями
utime.sleep(60)

except KeyboardInterrupt:
print('Остановка...')

finally:
# Корректное завершение работы
mqtt.disconnect()

# Запуск программы
if __name__ == '__main__':
main()

При разработке с использованием MicroPython следует помнить о следующих особенностях и лучших практиках:

  • Ограничения памяти: оптимизируйте код, избегайте больших строк и сложных структур данных
  • Энергосбережение: используйте режимы глубокого сна (deep sleep) для устройств на батареях
  • Обработка ошибок: всегда предусматривайте механизмы восстановления и повторного подключения
  • Безопасность: внедряйте шифрование и аутентификацию даже в небольших проектах
  • Модульность: разделяйте код на логические блоки для упрощения обновлений

Михаил Сергеев, разработчик встраиваемых систем В прошлом году мы столкнулись с проблемой при разработке системы мониторинга для промышленного холодильного оборудования. Клиент требовал быстрого развертывания на 200 единицах техники в разных городах. Традиционно мы использовали C++ для подобных проектов, но сроки были настолько сжатыми, что потребовался новый подход.

Решение пришло неожиданно: MicroPython на ESP32. Изначально команда сопротивлялась — «Python слишком медленный для промышленных систем», говорили коллеги. Но я настоял на пилотном внедрении.

Мы разработали прототип за 2 дня вместо планируемых 2 недель! Система собирала данные с температурных датчиков, анализировала их и отправляла предупреждения при отклонениях. Код получился в 4 раза короче, чем эквивалентный на C++.

Самым впечатляющим стало удаленное обновление. Когда в коде обнаружилась ошибка (некорректный расчет среднего значения температуры), мы смогли обновить все устройства по воздуху, просто отправив новый Python-скрипт. С C++ пришлось бы перепрошивать каждое устройство, что означало бы командировки специалистов в разные города.

Проект был сдан на неделю раньше срока, а клиент получил бонус — веб-интерфейс для мониторинга, который мы успели разработать благодаря сэкономленному времени. С тех пор MicroPython стал нашим стандартным инструментом для 70% проектов IoT.

Сбор и обработка данных с IoT-сенсоров в реальном времени

Сбор и анализ данных — сердце любого IoT-проекта. Python предоставляет мощные инструменты для работы с потоками сенсорных данных, их обработки, фильтрации и визуализации в режиме реального времени. Эффективная работа с данными позволяет превратить простой мониторинг в интеллектуальную систему, способную принимать решения автономно. 📊

Рассмотрим основные этапы обработки данных с IoT-устройств и соответствующие Python-библиотеки:

  • Сбор данных: интерфейсные библиотеки для различных сенсоров (Adafruit_CircuitPython, gpiozero)
  • Передача данных: протокольные библиотеки (paho-mqtt, requests, websockets)
  • Хранение данных: базы данных временных рядов (influxdb-python, prometheus-client)
  • Обработка потоков: потоковая аналитика (Apache Kafka с confluent-kafka, RxPY)
  • Анализ и машинное обучение: аналитические инструменты (NumPy, pandas, scikit-learn)
  • Визуализация: интерактивные дашборды (Dash, Bokeh, Streamlit)

Практический пример сбора и обработки данных с температурного сенсора с использованием MQTT и InfluxDB:

Python
Скопировать код
# На стороне IoT-устройства
import time
import random
import paho.mqtt.client as mqtt
import json

# Имитация сенсора температуры
def get_sensor_data():
return {
"temperature": 25 + random.uniform(-5, 5),
"humidity": 60 + random.uniform(-10, 10),
"timestamp": int(time.time())
}

# Настройка MQTT-клиента
client = mqtt.Client("temperature_sensor")
client.connect("mqtt.example.com", 1883, 60)

# Отправка данных каждые 5 секунд
try:
while True:
data = get_sensor_data()
client.publish("sensors/temperature", json.dumps(data))
print(f"Отправлено: {data}")
time.sleep(5)
except KeyboardInterrupt:
client.disconnect()
print("Соединение закрыто")

Python
Скопировать код
# На серверной стороне
import paho.mqtt.client as mqtt
from influxdb import InfluxDBClient
import json
from datetime import datetime
import numpy as np

# Буфер для хранения последних 10 измерений
temperature_buffer = []
MAX_BUFFER_SIZE = 10

# Соединение с InfluxDB
influx_client = InfluxDBClient('localhost', 8086, 'admin', 'password', 'iot_sensors')
influx_client.create_database('iot_sensors')

# Обработчики MQTT-событий
def on_connect(client, userdata, flags, rc):
print(f"Подключено к MQTT-брокеру с кодом: {rc}")
client.subscribe("sensors/temperature")

def on_message(client, userdata, msg):
try:
# Декодирование полученных данных
payload = json.loads(msg.payload)
temperature = float(payload["temperature"])
timestamp = payload.get("timestamp", int(datetime.now().timestamp()))

# Добавление в буфер для скользящего среднего
temperature_buffer.append(temperature)
if len(temperature_buffer) > MAX_BUFFER_SIZE:
temperature_buffer.pop(0)

# Расчёт скользящего среднего и стандартного отклонения
avg_temp = np.mean(temperature_buffer)
std_temp = np.std(temperature_buffer)

# Детекция аномалий (z-score > 2)
is_anomaly = abs(temperature – avg_temp) > 2 * std_temp

# Подготовка данных для InfluxDB
json_body = [
{
"measurement": "temperature",
"tags": {
"sensor_id": msg.topic.split('/')[-1],
"is_anomaly": str(is_anomaly)
},
"time": datetime.fromtimestamp(timestamp).isoformat(),
"fields": {
"value": temperature,
"moving_avg": avg_temp,
"std_dev": std_temp
}
}
]

# Запись в базу данных
influx_client.write_points(json_body)

# Вывод информации
print(f"Получено: {temperature}°C, Среднее: {avg_temp:.2f}°C, Аномалия: {is_anomaly}")

# Реакция на аномалии
if is_anomaly:
# Здесь может быть код для оповещения или автоматической реакции
client.publish("alerts/temperature", json.dumps({
"sensor": msg.topic.split('/')[-1],
"value": temperature,
"threshold": avg_temp + 2 * std_temp,
"timestamp": timestamp
}))

except Exception as e:
print(f"Ошибка обработки сообщения: {e}")

# Настройка MQTT-клиента
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message

# Подключение к брокеру
client.connect("mqtt.example.com", 1883, 60)

# Запуск цикла обработки
client.loop_forever()

При работе с данными IoT важно учитывать следующие аспекты:

Проблема Решение с использованием Python
Неполные или пропущенные данные Методы заполнения из pandas (interpolate, fillna) или прогнозирование с ARIMA
Шумы и выбросы Фильтры Калмана (pykalman) или медианные фильтры (scipy.signal)
Разная частота поступления данных Ресемплинг с помощью pandas.resample()
Большие объемы данных Потоковая обработка с Dask или Apache Spark (PySpark)
Предиктивная аналитика Машинное обучение с scikit-learn, Prophet или TensorFlow

Для визуализации данных в реальном времени Python предлагает несколько мощных фреймворков. Один из самых популярных — Dash от Plotly, который позволяет создавать интерактивные аналитические дашборды с минимальным количеством кода:

Python
Скопировать код
# Простой дашборд для визуализации температурных данных
import dash
from dash import dcc, html
from dash.dependencies import Input, Output
import plotly.graph_objs as go
from influxdb import InfluxDBClient
import pandas as pd

# Соединение с базой данных
influx_client = InfluxDBClient('localhost', 8086, 'admin', 'password', 'iot_sensors')

# Создание приложения Dash
app = dash.Dash(__name__)

# Макет дашборда
app.layout = html.Div([
html.H1("Мониторинг температуры в реальном времени"),

dcc.Graph(id='live-temperature-graph'),

dcc.Interval(
id='interval-component',
interval=5*1000, # в миллисекундах (обновление каждые 5 секунд)
n_intervals=0
)
])

# Функция обновления графика
@app.callback(
Output('live-temperature-graph', 'figure'),
[Input('interval-component', 'n_intervals')]
)
def update_graph(n):
# Запрос последних данных из InfluxDB
query = 'SELECT time, value, moving_avg FROM temperature WHERE time > now() – 1h'
result = influx_client.query(query)

# Преобразование в DataFrame
points = list(result.get_points())
df = pd.DataFrame(points)

if not df.empty:
df['time'] = pd.to_datetime(df['time'])

# Создание графика
figure = {
'data': [
go.Scatter(
x=df['time'],
y=df['value'],
name='Температура',
mode='lines+markers',
marker=dict(color='red')
),
go.Scatter(
x=df['time'],
y=df['moving_avg'],
name='Скользящее среднее',
mode='lines',
line=dict(color='blue', dash='dash')
)
],
'layout': go.Layout(
title='Температурные показатели за последний час',
xaxis=dict(title='Время'),
yaxis=dict(title='Температура (°C)'),
hovermode='closest'
)
}
else:
figure = {
'data': [],
'layout': go.Layout(
title='Нет данных за последний час'
)
}

return figure

if __name__ == '__main__':
app.run_server(debug=True)

Используя эти инструменты и подходы, вы можете создать полноценную систему сбора и анализа данных с IoT-устройств. Python делает этот процесс доступным даже для разработчиков без глубоких знаний в области анализа данных, предоставляя интуитивно понятные API и обширную документацию. Комбинирование IoT-сенсоров с алгоритмами машинного обучения открывает путь к созданию по-настоящему интеллектуальных систем, способных не только собирать данные, но и принимать на их основе автономные решения. 🤖

Создание полноценного IoT-проекта: от концепции до реализации

Создание комплексного IoT-проекта требует не только технических знаний, но и методического подхода к проектированию. Рассмотрим процесс разработки полноценной IoT-системы на примере умной теплицы — проекта, объединяющего различные аспекты IoT-разработки на Python. 🌱

Этапы разработки IoT-проекта:

  1. Определение требований и проектирование системы
  2. Выбор аппаратных компонентов и технологий
  3. Разработка программного обеспечения для устройств
  4. Создание серверной части и базы данных
  5. Разработка пользовательского интерфейса
  6. Тестирование и отладка системы
  7. Развертывание и масштабирование решения

Для нашего примера создадим умную теплицу с автоматическим контролем климата, системой полива и удаленным мониторингом. Архитектура системы будет включать следующие компоненты:

  • Сенсорный узел: ESP32 с MicroPython, датчики температуры, влажности почвы и воздуха, освещенности
  • Актуаторы: системы полива, вентиляции, освещения и обогрева
  • Шлюз: Raspberry Pi, агрегирующий данные и обеспечивающий локальное управление
  • Облачный сервер: Flask API, база данных InfluxDB, алгоритмы предсказания
  • Пользовательский интерфейс: веб-приложение и мобильное приложение

Рассмотрим ключевой компонент — код для контроллера теплицы на ESP32 с MicroPython:

Python
Скопировать код
# greenhouse_controller.py
import time
import machine
import dht
import network
from umqtt.robust import MQTTClient
import ujson as json

# Настройка пинов
DHT_PIN = 15
SOIL_MOISTURE_PIN = 32
LIGHT_SENSOR_PIN = 33
PUMP_PIN = 25
FAN_PIN = 26
LIGHT_PIN = 27
HEATER_PIN = 14

# Инициализация сенсоров
dht_sensor = dht.DHT22(machine.Pin(DHT_PIN))
soil_sensor = machine.ADC(machine.Pin(SOIL_MOISTURE_PIN))
soil_sensor.atten(machine.ADC.ATTN_11DB) # Полный диапазон: 3.3V
light_sensor = machine.ADC(machine.Pin(LIGHT_SENSOR_PIN))
light_sensor.atten(machine.ADC.ATTN_11DB)

# Инициализация актуаторов
pump = machine.Pin(PUMP_PIN, machine.Pin.OUT)
fan = machine.Pin(FAN_PIN, machine.Pin.OUT)
light = machine.Pin(LIGHT_PIN, machine.Pin.OUT)
heater = machine.Pin(HEATER_PIN, machine.Pin.OUT)

# Идентификатор устройства
DEVICE_ID = "greenhouse_1"

# Настройки MQTT
MQTT_HOST = "192.168.1.10"
MQTT_PORT = 1883
MQTT_USER = "greenhouse"
MQTT_PASSWORD = "secret"
MQTT_TOPIC_TELEMETRY = f"devices/{DEVICE_ID}/telemetry"
MQTT_TOPIC_CONTROL = f"devices/{DEVICE_ID}/control"

# Настройки Wi-Fi
WIFI_SSID = "YOUR_WIFI_SSID"
WIFI_PASSWORD = "YOUR_WIFI_PASSWORD"

# Функция подключения к Wi-Fi
def connect_wifi():
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
if not wlan.isconnected():
print(f"Connecting to {WIFI_SSID}...")
wlan.connect(WIFI_SSID, WIFI_PASSWORD)
max_wait = 10
while max_wait > 0:
if wlan.isconnected():
break
max_wait -= 1
time.sleep(1)
if wlan.isconnected():
print(f"Connected to {WIFI_SSID}")
print(f"IP: {wlan.ifconfig()[0]}")
else:
print(f"Failed to connect to {WIFI_SSID}")
machine.reset() # Перезагружаемся при неудаче
return wlan

# Функция для чтения данных с сенсоров
def read_sensors():
try:
# Чтение DHT22
dht_sensor.measure()
temperature = dht_sensor.temperature()
humidity = dht_sensor.humidity()

# Чтение влажности почвы (преобразование в проценты)
soil_value = soil_sensor.read()
soil_moisture = 100 – (soil_value / 4095 * 100)

# Чтение освещенности (преобразование в проценты)
light_value = light_sensor.read()
light_level = light_value / 4095 * 100

return {
"temperature": temperature,
"humidity": humidity,
"soil_moisture": soil_moisture,
"light_level": light_level,
"timestamp": time.time()
}
except Exception as e:
print(f"Error reading sensors: {e}")
return None

# Обработчик MQTT-сообщений для управления актуаторами
def on_control_message(topic, message):
try:
control = json.loads(message)

# Управление насосом
if "pump" in control:
pump.value(1 if control["pump"] else 0)
print(f"Pump set to: {control['pump']}")

# Управление вентилятором
if "fan" in control:
fan.value(1 if control["fan"] else 0)
print(f"Fan set to: {control['fan']}")

# Управление освещением
if "light" in control:
light.value(1 if control["light"] else 0)
print(f"Light set to: {control['light']}")

# Управление обогревателем
if "heater" in control:
heater.value(1 if control["heater"] else 0)
print(f"Heater set to: {control['heater']}")

# Подтверждение получения команды
client.publish(
f"devices/{DEVICE_ID}/status",
json.dumps({"received": control, "status": "ok"})
)

except Exception as e:
print(f"Error processing control message: {e}")
client.publish(
f"devices/{DEVICE_ID}/status",
json.dumps({"error": str(e)})
)

# Инициализация MQTT-клиента
def init_mqtt():
global client

# Создание клиента
client = MQTTClient(
DEVICE_ID, 
MQTT_HOST,
port=MQTT_PORT,
user=MQTT_USER,
password=MQTT_PASSWORD,
keepalive=30
)

# Настройка обратного вызова
client.set_callback(on_control_message)

# Подключение
try:
client.connect()
print(f"Connected to MQTT broker at {MQTT_HOST}")

# Подписка на топик управления
client.subscribe(MQTT_TOPIC_CONTROL)
print(f"Subscribed to {MQTT_TOPIC_CONTROL}")

return client
except Exception as e:
print(f"Failed to connect to MQTT: {e}")
time.sleep(10)
machine.reset()

# Автоматическое управление теплицей
def auto_control(sensor_data):
# Алгоритм управления поливом
if sensor_data["soil_moisture"] < 30: # Если почва слишком сухая
pump.value(1) # Включаем полив
print("Auto: Pump ON (soil too dry)")
elif sensor_data["soil_moisture"] > 70: # Если почва достаточно влажная
pump.value(0) # Выключаем полив
print("Auto: Pump OFF (soil moist enough)")

# Алгоритм управления вентиляцией
if sensor_data["humidity"] > 85: # Если влажность слишком высокая
fan.value(1) # Включаем вентиляцию
print("Auto: Fan ON (humidity too high)")
elif sensor_data["humidity"] < 60: # Если влажность нормализовалась
fan.value(0) # Выключаем вентиляцию
print("Auto: Fan OFF (humidity normal)")

# Алгоритм управления освещением
if sensor_data["light_level"] < 20 and 6 <= time.localtime()[3] < 20: # Если темно в дневное время
light.value(1) # Включаем освещение
print("Auto: Light ON (too dark during day)")
elif sensor_data["light_level"] > 50 or time.localtime()[3] >= 20 or time.localtime()[3] < 6:
light.value(0) # Выключаем освещение ночью или при достаточной освещенности
print("Auto: Light OFF (night time or bright enough)")

# Алгоритм управления обогревом
if sensor_data["temperature"] < 18: # Если слишком холодно
heater.value(1) # Включаем обогрев
print("Auto: Heater ON (too cold)")
elif sensor_data["temperature"] > 25: # Если достаточно тепло
heater.value(0) # Выключаем обогрев
print("Auto: Heater OFF (warm enough)")

# Главная функция
def main():
# Подключение к Wi-Fi
wlan = connect_wifi()

# Инициализация MQTT
client = init_mqtt()

# Главный цикл
last_telemetry_time = 0
TELEMETRY_INTERVAL = 60 # Интервал отправки телеметрии (в секундах)

try:
while True:
# Проверка наличия входящих сообщений
client.check_msg()

# Чтение и отправка телеметрии с заданным интервалом
current_time = time.time()
if current_time – last_telemetry_time >= TELEMETRY_INTERVAL:
sensor_data = read_sensors()

if sensor_data:
# Отправка телеметрии
client.publish(MQTT_TOPIC_TELEMETRY, json.dumps(sensor_data))
print(f"Telemetry sent: {sensor_data}")

# Автоматическое управление
auto_control(sensor_data)

last_telemetry_time = current_time

# Короткая пауза для экономии ресурсов
time.sleep(1)

except Exception as e:
print(f"Runtime error: {e}")
time.sleep(10)
machine.reset()

# Запуск программы
if __name__ == "__main__":
main()

На стороне сервера реализуем Flask API для приема и обработки данных:

Python
Скопировать код
# server/app.py
from flask import Flask, request, jsonify
from influxdb import InfluxDBClient
import datetime
import pandas as pd
from sklearn.ensemble import RandomForestRegressor
import schedule
import threading
import time

app = Flask(__name__)

# Подключение к InfluxDB
influx_client = InfluxDBClient(host='localhost', port=8086)
influx_client.switch_database('greenhouse_db')

# Модель машинного обучения для предсказания
model = None

# Функция для обучения модели предсказания на исторических данных
def train_prediction_model():
global model

# Запрос исторических данных за последнюю неделю
query = 'SELECT * FROM temperature,humidity,soil_moisture,light_level WHERE time > now() – 7d'
result = influx_client.query(query)

# Преобразование в DataFrame
data = []
for point in result.get_points():
data.append(point)

if not data:
print("No historical data available for training")
return

df = pd.DataFrame(data)
df['time'] = pd.to_datetime(df['time'])

# Извлечение признаков для машинного обучения
df['hour'] = df['time'].dt.hour
df['day_of_week'] = df['time'].dt.dayofweek

# Подготовка данных для обучения
X = df[['hour', 'day_of_week', 'humidity', 'light_level']]
y = df['temperature'] # Предсказываем температуру

# Обучение модели
model = RandomForestRegressor(n_estimators=100, random_state=42)
model.fit(X, y)

print("Prediction model trained successfully")

# Планирование ежедневного переобучения модели
def schedule_model_training():
schedule.every().day.at("02:00").do(train_prediction_model)

while True:
schedule.run_pending()
time.sleep(60)

# API-конечная точка для приема телеметрии
@app.route('/api/telemetry', methods=['POST'])
def receive_telemetry():
data = request.json

if not data or 'device_id' not in data:
return jsonify({"error": "Invalid data format"}), 400

device_id = data['device_id']
telemetry = data['telemetry']
timestamp = datetime.datetime.utcnow().isoformat()

# Подготовка данных для InfluxDB
points = []

for key, value in telemetry.items():
if key != 'timestamp': # Исключаем timestamp из устройства
point = {
"measurement": key,
"tags": {
"device_id": device_id
},
"time": timestamp,
"fields": {
"value": float(value)
}
}
points.append(point)

# Запись в InfluxDB
if points:
influx_client.write_points(points)

return jsonify({"status": "ok"})

# API-конечная точка для отправки управляющих команд
@app.route('/api/control/<device_id>', methods=['POST'])
def send_control(device_id):
# Здесь должна быть логика для отправки команд через MQTT
# В реальном приложении это будет интегрировано с MQTT-брокером

return jsonify({"status": "command sent", "device_id": device_id})

# API-конечная точка для получения предсказаний
@app.route('/api/predict/<device_id>', methods=['GET'])
def get_prediction(device_id):
if model is None:
return jsonify({"error": "Prediction model not trained yet"}), 503

# Получение последних данных для устройства
query = f'SELECT last("value") FROM humidity,light_level WHERE "device_id" = \'{device_id}\' GROUP BY "measurement"'
result = influx_client.query(query)

# Подготовка данных для предсказания
current_hour = datetime.datetime.now().hour
current_day = datetime.datetime.now().weekday()

humidity = None
light_level = None

for point in result.get_points(measurement='humidity'):
humidity = point['last']

for point in result.get_points(measurement='light_level'):
light_level = point['last']

if humidity is None or light_level is None:
return jsonify({"error": "Insufficient data for prediction"}), 400

# Выполнение предсказания
prediction = model.predict([[current_hour, current_day, humidity, light_level]])

return jsonify({
"device_id": device_id,
"predicted_temperature": float(prediction[0]),
"timestamp": datetime.datetime.now().isoformat()
})

if __name__ == '__main__':
# Запуск обучения модели в отдельном потоке
threading.Thread(target=schedule_model_training, daemon=True).start()

# Первоначальное обучение модели
train_prediction_model()

# Запуск Flask-сервера
app.run(host='0.0.0.0', port=5000)

Для завершения проекта необходимо создать интерфейс пользователя. Можно использовать Dash или React для веб-интерфейса. Вот пример простого дашборда на Dash:

Python
Скопировать код
# dashboard/app.py
import dash
from dash import dcc, html
from dash.dependencies import Input, Output, State
import plotly.graph_objs as go
from influxdb import InfluxDBClient
import pandas as pd
import requests
import datetime

# Подключение к InfluxDB
influx_client = InfluxDBClient(host='localhost', port=8086)
influx_client.switch_database('greenhouse_db')

# Создание приложения Dash
app = dash.Dash(__name__, meta_tags=[
{"name": "viewport", "content": "width=device-width, initial-scale=1"}
])

# Макет дашборда
app.layout = html.Div([
html.H1("Умная теплица: Система мониторинга и управления"),

html.Div([
html.Div([
html.H3("Выбор устройства"),
dcc.Dropdown(
id='device-selector',
options=[
{'label': 'Теплица 1', 'value': 'greenhouse_1'},
{'label': 'Теплица 2', 'value': 'greenhouse_2'}
],
value='greenhouse_1'
)
], className="four columns"),

html.Div([
html.H3("Период времени"),
dcc.RadioItems(
id='time-range',
options=[
{'label': 'Последний час', 'value': '1h'},
{'label': 'Последний день', 'value': '1d'},
{'label': 'Последняя неделя', 'value': '7d'}
],
value='1h'
)
], className="four columns"),

html.Div([
html.H3("Текущий статус

Загрузка...