Фильтрация DataFrame в Pyspark с помощью оператора IN в SQL
Быстрый ответ
Если вам нужно отфильтровать строки в DataFrame PySpark, где определённый столбец включает значения из указанного списка, вы можете использовать метод isin()
:
# Список значений для фильтрации
vals = [1, 3, 5]
# Фильтрация DataFrame, где столбец 'col' включает значения из списка 'vals'
filtered_df = df.filter(df['col'].isin(vals))
Такой подход вернёт только строки, значение в которых в столбце 'col' равно 1, 3 или 5.
Работа с различными типами данных и нечувствительность к регистру
Не забывайте, что сравнивать стоит только данные одного типа — целые числа с целыми, строки со строками. Для того чтобы игнорировать регистр при сравнении строк, примените метод lower()
:
from pyspark.sql.functions import col, lower
# Пример со списком фруктов
vals_str = ["Apple", "banana", "Cherry"]
# Фильтрация DataFrame с учетом игнорирования регистра
filtered_df_str = df.filter(lower(col("fruit")).isin([x.lower() for x in vals_str]))
Использование broadcast для оптимизации производительности
При работе с длинными списками значений рекомендуется использовать функцию broadcast
в Spark, чтобы улучшить производительность:
from pyspark.sql.functions import broadcast
# Длинный список значений
vals_large = range(10000)
# Используем broadcast для улучшения производительности фильтра
bc_vals = spark.sparkContext.broadcast(list(vals_large))
filtered_df_large = df.filter(col("id").isin(bc_vals.value))
Визуализация
Для наглядности применения фильтра вы можете представить его как ключ, открывающий посредственно нужные вам ценности из тайника:
Содержимое тайника: [💎, 💰, 📿, 🏺, 🗡]
Нужные сокровища: [💰, 🗡]
Фильтр с условием IN подберет нужные ценности:
df.filter(col("gem").isin(["💰", "🗡"]))
Таким образом, вы получите только необходимые вам сокровища.
Результат: [💰, 🗡]
Использование динамических и безопасных запросов для умного выбора
Метод isin()
позволяет вам использовать динамические списки значений и предотвращать возникающие при ручном вводе ошибки:
# Пусть список значений приходит из некой зашифрованной передачи данных
dynamic_vals = get_dynamic_value_list_somehow() # Источник этих данных остается в сокровенной тайне
filtered_dynamic_df = df.filter(col("category").isin(dynamic_vals)) # Лишь нужные записи пройдут на нашу секретную вечеринку
Однако проявляйте осторожность при обработке переменных в SQL-коде из-за вероятности возникновения SQL-инъекций и других ошибок.
Проникнитесь с помощью explain
Чтобы понять, как DataFrame выполняет фильтрацию, можно привлечь метод explain()
:
# План выполнения запроса
filtered_df.explain()
Анализ полученного плана может помочь в оптимизации запросов.
Фильтры и их отношение к типам данных
Способ записи условий фильтра зависит от типа данных, с которыми вы работаете:
- Целые числа: значения передаются непосредственно в метод
isin()
. - Строки: значения передаются в одинарных кавычках, но чтобы сделать сравнение независимым от регистра, используется
lower()
.
Плавный переход между DSL и SQL
DSL DataFrame предлагает большую гибкость, но если вам привычнее SQL, вы можете создавать временные представления:
# Создаем временное представление
df.createOrReplaceTempView("df_table")
# Формируем SQL-запрос
query = """SELECT * FROM df_table WHERE col IN (1, 3, 5)"""
result = spark.sql(query)
Работая с динамическими запросами, остерегайтесь возможной уязвимости при использовании SQL по сравнению с DSL.
Полезные материалы
- Официальное руководство Databricks по обработке данных в PySpark DataFrames — подробное руководство по применению фильтрации DataFrame в PySpark.
- Использование условия IN в PySpark – статья на Medium — практические примеры использования условия IN.
- Мастерство фильтрации данных в PySpark – статья на Towards Data Science — подробное руководство по продвинутым стратегиям фильтрации данных в Spark.
- Официальная документация Apache Spark по функциям Spark SQL — все функции Spark SQL, включая те, что связаны с условием IN.