Принципи SOLID для Spark застосунків

Передмова🚀

У цій публікації блогу розглядається, чи може PySpark використовувати принципи SOLID для задач інженерії даних.

Ось серія статей про принципи SOLID у Python для задач інженерії даних, над якими я працюю:

Я також досліджував застосування функціонального програмування в інженерії даних у цих публікаціях:

Що таке Spark? 💥💡

Spark – це фреймворк для обробки великих обсягів даних, розподілених між декількома машинами одночасно. Він спочатку написаний на Scala, мові програмування, яка підтримує об’єктно-орієнтоване програмування (ООП) та функціональне програмування (ФП).

Spark в основному має справу з незмінними типами даних, такими як RDD та DataFrame. Якщо тип даних є незмінним, це означає, що після створення він не може бути змінений. Єдиний спосіб змінити їх – це створити абсолютно нову структуру, яка включає бажані зміни.

Що таке PySpark?💥🐍

PySpark – це Python API для взаємодії з сервісами, що надаються Spark. Це дозволяє розробникам використовувати потужність Spark, одночасно використовуючи простоту Python для полегшення створення надійних додатків, які обробляють великі обсяги даних.

Таким чином, програмісти на Python можуть залишатися з Python і використовувати можливості розподілених обчислень Spark за допомогою PySpark.

Об’єктно-орієнтоване програмування (ООП)👥

Об’єктно-орієнтоване програмування (ООП) – це парадигма програмування, яка наголошує на використанні об’єктів. В ООП дані зберігаються в екземплярах класів (а не в самих класах). Отже, клас повинен бути спочатку прочитаний (або інстанційований) в об’єкт, перш ніж дані можуть бути збережені в ньому. Дані, що зберігаються в екземплярі класу, називаються “станом (state)”.

ООП часто має справу зі змінними даними, тобто даними, які можна змінювати після їх створення.

🔋Переваги ООП з PySpark

  • Повторне використання 🔁 – Класи можна використовувати повторно для створення декількох нових об’єктів без переписування коду з нуля
  • Гнучкість ⚙️ – Заняття можуть бути адаптовані відповідно до мінливих потреб, що вимагаються програмою
  • Модульність 🧱 – код організовано в незалежні класи та методи, тому модифікація однієї частини коду не вплине на інші.

⚠️Challenges ООP з PySpark

  • Мутабельність 🔄 – Обробка мутабельних типів даних у розподіленому обчислювальному середовищі може спричинити проблеми, пов’язані зі станом, такі як спроби підтримувати узгодженість при обробці мутабельних даних на декількох вузлах
  • Важко використовувати успадкування або поліморфізм 🚫 – PySpark наразі не має достатньої підтримки цих концепцій ООП для побудови надійних конвеєрів за допомогою ООП
  • Порушення інкапсуляції 🔓- RDD та DataFrame схильні до порушення інкапсуляції в ООП, оскільки вони розкривають дані під час обробки в робочих вузлах

Коли слід використовувати ООP у PySpark?🤔💭🤔💭

TLDR: Коли потрібно підтримувати стан у всіх конвеєрах даних (data pipelines). 🗃️🔄

Хоча зберігання стану для розподілених обчислень часто не рекомендується (особливо для даних, що змінюються), існують випадки використання, коли це може бути виправдано. Давайте продемонструємо це на прикладах коду:

Ми можемо почати з налаштування логіки конфігурації, яка буде подавати дані нашому додатку Spark, коли ми будемо готові його налаштувати:

from pyspark import SparkConf
from pyspark.sql import SparkSession

spark_config = (SparkConf()
          .setAppName("Stephen's OOP App for Blog")
          .setMaster("local")                           # Cluster: Standalone
          .set("spark.executors.instances", 3)          # Executors: 3
          .set("spark.executors.cores", 5)              # CPU cores: 5
          .set("spark.memory.fraction", "0.5")          # Heap space fraction - 50% storage memory, 50% execution memory
          .set("spark.executor.memory", "2g")           # Execution memory: 2 GB    
         )

Отже, давайте розглянемо причини, чому ООП в PySpark має сенс:

1. Для належного управління системними ресурсами 💻🔄 – обробка системних ресурсів, таких як з’єднання з базою даних, конфіденційні файли і змінні середовища, може вимагати стандартизованого і безпечного протоколу для звільнення після того, як користувачі закінчать з ними працювати.

Хорошим прикладом цього може бути використання методів для запуску і зупинки сеансу Spark, наприклад, таким чином:

# Create a class to initialize and terminate Spark Session
class SparkSessionManager:
    def __init__(self):
        self.spark = None
	
	
# Create a method for starting the Spark Session
    def start_spark_session(self, config):
        self.spark = SparkSession.builder.config(conf=config).getOrCreate()
        print("Started Spark Session. ")
		

# Create a method for stopping the Spark Session
    def stop_spark_session(self):
        if self.spark is not None:
            self.spark.stop()
            print("Stopped Spark Session.")

Клас SparkSessionManager містить два методи:

  1. start_spark_session – для запуску Spark Session
  2. stop_spark_session – для зупинки Spark Session
from pyspark.sql import Row


# Instantiate object for Spark Session
spark_session_manager = SparkSessionManager()


# Activate Spark Session
spark_session_manager.start_spark_session(spark_config)


# Create dummy data 
data_1 = [
        Row(name='John', age=25, city='London'),
        Row(name='Matilda', age=44, city='Shanghai'),
        Row(name='Ashley', age=21, city='Lagos'),
        Row(name='Mike', age=19, city='Doncaster'),
        Row(name='Charlie', age=53, city='Abuja'),
        Row(name='Brian', age=27, city='New York'),
       ]


# Create a Spark dataframe
df_1 = spark_session_manager.spark.createDataFrame(data_1)


# Display data
df_1.show()


### Output:
+-------+---+---------+
|   name|age|     city|
+-------+---+---------+
|   John| 25|   London|
|Matilda| 44| Shanghai|
| Ashley| 21|    Lagos|
|   Mike| 19|Doncaster|
|Charlie| 53|    Abuja|
|  Brian| 27| New York|
+-------+---+---------+

Потім ми можемо прочитати дані в Spark DataFrame і виконати будь-яку операцію, яка нам потрібна.

Коли ми закінчимо, ми можемо завершити сеанс Spark:

# Terminate Spark session
spark_session_manager.stop_spark_session()

2. Для створення інструментів, які спільно використовують поведінку при перетвореннях 👥🛠️- Цей стиль програмування особливо корисний для створення додатків, які спільно використовують стан при перетвореннях даних. Прикладом можуть слугувати інструменти машинного навчання, які потребують даних, що зберігаються в класі, для виконання прогнозів у реальному часі.

Давайте додамо більше даних, щоб продемонструвати цей варіант використання:

# Create more dummy data 
data_2 = [
        Row(name='Sam', age=65, city='Sydney'),
        Row(name='Louisa', age=24, city='Gatwick'),
        Row(name='Zeeshan', age=31, city='Amsterdam'),
        Row(name='Todd', age=58, city='Paris'),
        Row(name='Samuel', age=26, city='Cape Town'),
        Row(name='Michaella', age=44, city='Toronto'),
       ]


# Create another Spark dataframe
df_2 = spark_session_manager.spark.createDataFrame(data_2)



# Display data
df_2.show()


### Output:
+---------+---+---------+
|     name|age|     city|
+---------+---+---------+
|      Sam| 65|   Sydney|
|   Louisa| 24|  Gatwick|
|  Zeeshan| 31|Amsterdam|
|     Todd| 58|    Paris|
|   Samuel| 26|Cape Town|
|Michaella| 44|  Toronto|
+---------+---+---------+

Ми можемо використовувати метод transform_dataframe з класу PreprocessingManager, щоб застосувати ту саму послідовність перетворень до кожного DataFrame, що зчитується у Spark:

from pyspark.sql import DataFrame


# Create a class for pre-processing data 
class PreprocessingManager:
    def __init__(self, df: DataFrame):
        self.df = df
        
    # Create a method for applying custom transformation tasks
    def transform_dataframe(self):
        self.df = self.df.dropna()
        self.df = self.df.filter(self.df["age"] > 30)
        
        return self.df


# Instantiate the object for preprocessing data
pre_processor_1    =   PreprocessingManager(df_1)
pre_processor_2    =   PreprocessingManager(df_2)

Отже, застосуємо перетворення – почнемо з першого DataFrame:

# Transform the dataframes 

# 1. 
transformed_df_1   =   pre_processor_1.transform_dataframe()
transformed_df_1.show()


#### Output 1:
+-------+---+--------+
|   name|age|    city|
+-------+---+--------+
|Matilda| 44|Shanghai|
|Charlie| 53|   Abuja|
+-------+---+--------+

…а тепер другий …:

# 2.
transformed_df_2   =   pre_processor_2.transform_dataframe()
transformed_df_2.show()


#### Output 2:
+---------+---+---------+
|     name|age|     city|
+---------+---+---------+
|      Sam| 65|   Sydney|
|  Zeeshan| 31|Amsterdam|
|     Todd| 58|    Paris|
|Michaella| 44|  Toronto|
+---------+---+---------+

3. Для інкапсуляції складних операцій 📦🔐 – хоча це звучить як протиріччя останньому пункту для “⚠️Challenges ООП з PySpark”, інкапсуляція в PySpark все ще можлива для організації коду в методи для:

  • приховування внутрішнього стану конкретних операцій, а також
  • розкриття необхідних функцій, наприклад, для перевірки даних
# Create a class for validating data 
class IDataValidator:
    
    
    # Create a method for checking if the field selected is positive
    @staticmethod
    def check_if_data_is_positive(df, column):
        return df.filter(df[column] > 0)
    
    
    # Create a method for checking the people over 40 years of age
    @staticmethod
    def check_people_over_40(df, age_field):
        return df.filter(df[age_field] > 40)
    
    
    # Create a method for checking if the field contains any NULL values
    @staticmethod
    def check_if_column_contains_null(df, column):
        return df.filter(df[column].isNotNull())

Тепер запустимо перевірки, починаючи з першої:

# Run 1st check 
validated_data_1 = IDataValidator.check_if_data_is_positive(df_1, "age")
validated_data_1.show()


### Output 1: 
+-------+---+---------+
|   name|age|     city|
+-------+---+---------+
|   John| 25|   London|
|Matilda| 44| Shanghai|
| Ashley| 21|    Lagos|
|   Mike| 19|Doncaster|
|Charlie| 53|    Abuja|
|  Brian| 27| New York|
+-------+---+---------+

…а тепер другий:

# Run 2nd check 
validated_data_2  = IDataValidator.check_people_over_40(df_2, "age")
validated_data_2.show()


## Output 2:
+---------+---+-------+
|     name|age|   city|
+---------+---+-------+
|      Sam| 65| Sydney|
|     Todd| 58|  Paris|
|Michaella| 44|Toronto|
+---------+---+-------+

Логіка кожного методу абстрагована від користувача, а сам метод можна викликати для перевірки даних. Користувачеві не потрібно знати, “як” метод виконує свою роботу, йому потрібно лише викликати сам метод.

Функціональне програмування (FP)🌟

Функціональне програмування (ФП) – це розділ програмування, який робить акцент на використанні функцій і заохочує використання незмінних типів даних.

🔋Переваги ФП з PySpark

  • Незмінність 🛡- Акцент ФП на незмінності узгоджується з незмінними структурами даних Spark, що зменшує проблеми, пов’язані зі станом під час паралельних обчислень
  • Зрозумілість 📖- ФП сприяє використанню незалежних функцій, що полегшує читання та підтримку кодової бази
  • Обчислення без стану 🌍 – Функції не зосереджуються на підтримці стану, тому це дозволяє операціям PySpark працювати незалежно, не турбуючись про побічні ефекти

⚠️Challenges ФП з PySpark

  • Крута крива навчання 🎢📚- Для тих, хто прийшов з ООП, принципи ПФ можуть бути складними для засвоєння
  • Обмежений інструментарій 🧰- Існує не так багато бібліотек, які підтримують FP для PySpark, що обмежує можливості для надійних конвеєрів даних
  • Ресурсомісткі 🛠 – Акцент FP на незмінності означає, що після кожної трансформації створюються нові дані, що збільшує використання ресурсів🍔.

Коли я повинен використовувати ФП з PySpark?🤔💭

TLDR: Коли завдання з трансформації даних потрібно виконувати з незмінними даними 🔒📋.

Давайте розглянемо це більш детально:

1. Для запуску завдань Spark на незмінних даних 📊💎- ФП найкраще працює, коли має справу з незмінними типами даних. Мутація даних у розподіленому середовищі може призвести до перегонів даних, коли декілька потоків одночасно отримують доступ до одних і тих самих даних і обробляють їх, що може призвести до неочікуваних помилок у процесах перетворення даних. Це ускладнює процес налагодження, оскільки програма Spark може видавати результати, які важко (або неможливо) повторити.

Ось приклад обробки незмінних даних:

from pyspark.sql import Row
from pyspark.sql import functions as F


# Create dummy data 
data_1 = [
        Row(name='John', age=25, city='London'),
        Row(name='Matilda', age=44, city='Shanghai'),
        Row(name='Ashley', age=21, city='Lagos'),
        Row(name='Mike', age=19, city='Doncaster'),
        Row(name='Charlie', age=53, city='Abuja'),
        Row(name='Brian', age=27, city='New York'),
       ]


# Create a Spark dataframe
df_1 = spark.createDataFrame(data_1)


# Transform data
calculate_dob = F.expr("date_add(current_date(), -cast(age * 365 as int))")
df_2 = df_1.withColumn("date_of_birth", calculate_dob)

Тут ми створюємо набір даних data_1 і фрейм даних Spark, df_1. Щоб додати ще один стовпець, ми створюємо користувацьку операцію за допомогою F.expr і передаємо її в іншу операцію, яка створює інший фрейм даних, df_2.

Таким чином, замість того, щоб змінювати існуючий фрейм даних df_1, ми копіюємо його, додаємо до нього поле date_of_birth і називаємо його df_2. Такий підхід втілює концепцію незмінності, зменшуючи ризик перегонів даних і полегшуючи налагодження.

Результат:

# Display data
df_2.show()


### Output: 
+-------+---+---------+-------------+
|   name|age|     city|date_of_birth|
+-------+---+---------+-------------+
|   John| 25|   London|   1998-06-11|
|Matilda| 44| Shanghai|   1979-06-16|
| Ashley| 21|    Lagos|   2002-06-10|
|   Mike| 19|Doncaster|   2004-06-09|
|Charlie| 53|    Abuja|   1970-06-18|
|  Brian| 27| New York|   1996-06-11|
+-------+---+---------+-------------+

Як ви можете бачити, df_2 все ще зберігає існуючі дані з df_1, але тепер до них додано поле date_of_birth. Таким чином, df_1 залишається незмінним, і принцип незмінності успішно застосовано.

2. Для побудови конвеєрів з передбачуваними виходами 🧩🔍 – через те, що функції не мають стану у ФП, вихід функції повністю залежить від її входу. Такий рівень передбачуваності корисний для тестування та усунення несправностей розподілених систем

from pyspark.sql import DataFrame

# Create function to change a column's values to uppercase
def convert_to_uppercase(df: DataFrame, column: str) -> DataFrame:
    return df.withColumn(f"{column}_in_uppercase", F.upper(df[column]))

# Apply operation 
transformed_df_1 = convert_to_uppercase(df_1, "name")

Тут ми визначаємо функцію convert_to_uppercase, яка отримує на вхід DataFrame і назву стовпця, а потім повертає новий DataFrame зі значеннями вибраного стовпця, перетвореними у верхній регістр.

Ця функція не зберігає жодних даних і, відповідно, не зберігає жодного стану. Іншими словами, її результат залежить виключно від вхідних параметрів.

Ця функція convert_to_uppercase застосовується до фрейму даних df_1 і поля name. Кожного разу, коли ми передаємо ці вхідні дані у convert_to_uppercase, вона завжди повертатиме ті самі значення у верхньому регістрі для стовпчика name, що робить операцію передбачуваною і надійною.

У результаті ми отримаємо наступний результат:

# Display results
transformed_df_1.show()


### Output:

+-------+---+---------+-----------------+
|   name|age|     city|name_in_uppercase|
+-------+---+---------+-----------------+
|   John| 25|   London|             JOHN|
|Matilda| 44| Shanghai|          MATILDA|
| Ashley| 21|    Lagos|           ASHLEY|
|   Mike| 19|Doncaster|             MIKE|
|Charlie| 53|    Abuja|          CHARLIE|
|  Brian| 27| New York|            BRIAN|
+-------+---+---------+-----------------+

3. Для використання композиції при побудові складних потоків даних 🔗🔨- композиція функцій дозволяє виходам однієї функції бути входами іншої функції, що полегшує створення та читання вихідного коду конвеєрів зі складною поведінкою.

Ми скористаємося функцією трансформації в PySpark, яка дозволяє нам зв’язати в ланцюжок кілька операцій трансформації і застосувати їх до одного фрейму даних.

from typing import List
from pyspark.sql import DataFrame


# Create function to sort the dataframe by a list of columns
def sort_dataframe(df: DataFrame, columns: List[str]) -> DataFrame:
    return df.sort(*columns)


# Create a function for filtering by a minimum age value
def filter_by_age(df: DataFrame, min_age: int) -> DataFrame:
    return df.filter(df["age"] > min_age)


# Create lambda functions to apply one-line transformations
apply_case_conversions  =  lambda df: convert_to_uppercase(df, "name")
apply_sorting_logic     =  lambda df: sort_dataframe(df, ["city", "name"])
apply_age_filtering     =  lambda df: filter_by_age(df, 20)


# Use composition via "transform" operaton to transform dataframe
transformed_df_2   = (df_1
                   .transform(apply_case_conversions)
                   .transform(apply_sorting_logic)
                   .transform(apply_age_filtering)
                    )

Функція transform використовується 3 рази для формування функцій вищого порядку для задач перетворення. Розглянемо їх застосування:

  • transform(apply_case_conversions) – Фрейм даних df_1 передається у функцію apply_case_conversions для перетворення вибраного поля у верхній регістр, а потім результат цієї операції використовується як вхідні дані для наступної функції.
  • transform(apply_sorting_logic) – ця функція приймає вихідні дані попередньої операції (тобто фрейм даних df_1 зі стовпчиком “name” у верхньому регістрі) і сортує їх за полями city та name. Вихідні дані передаються в наступну функцію.
  • transform(apply_age_filtering) – ця функція приймає вихідні дані попередньої операції (тобто кадр даних df_1 із застосованими операціями перетворення регістру та сортування), а потім фільтрує рядки на основі стовпця віку і повертає кінцевий перетворений кадр даних.

Отже, ця композиція дає такі результати:

# Display results
transformed_df_2.show()


### Output:
+-------+---+--------+-----------------+
|   name|age|    city|name_in_uppercase|
+-------+---+--------+-----------------+
|Charlie| 53|   Abuja|          CHARLIE|
| Ashley| 21|   Lagos|           ASHLEY|
|   John| 25|  London|             JOHN|
|  Brian| 27|New York|            BRIAN|
|Matilda| 44|Shanghai|          MATILDA|
+-------+---+--------+-----------------+

Що таке принципи SOLID?📊

Принципи SOLID – це набір принципів проектування, які допомагають створювати програмні рішення, які легко тестувати, читати і підтримувати протягом їх життєвого циклу.

Ви можете прочитати більше про це в першій частині цієї серії статей тут.

1. Принцип єдиної відповідальності (SRP) 📌🧰 📌🧰

Принцип єдиної відповідальності (SRP Single Responsibility Principle) стверджує, що модуль повинен змінюватися лише з однієї причини. Іншими словами, кожен клас, метод або функція повинні мати лише одну відповідальність.

У контексті ООП це означає, що кожен клас або метод повинен бути розділений за обов’язками. У ФП це означає, що кожна функція повинна бути розділена за обов’язками.

Давайте розглянемо, як обидва підходи по-різному застосовуються у PySpark:

A. Застосування ООП👥📌

class IDatabaseConnector:
    def __init__(self, connection_string):
        self.connection_string = connection_string
        
    def connect_to_db(self):
        print("Connecting to database...")
    
    def close_connection_to_db(self):
        print("Closing connection to database...")


db_connector = IDatabaseConnector(connection_string="HOST: local, PORT: 1234")
db_connector.connect_to_db()

Тут ми створюємо клас IDatabaseConnector, який буде обробляти з’єднання і роз’єднання з базою даних (за допомогою методів connect_to_db і close_connection_to_db відповідно) – можна з упевненістю сказати, що його єдина відповідальність полягає в управлінні з’єднанням з базою даних, таким чином, дотримуючись SRP.

### Output:
Connecting to database...

B. Застосування ФП 🌟📌

from pyspark.sql.functions import col, when


# Create function for depositing money 
def deposit_money(df, account_number: int, amount: float):
    return df.withColumn('balance', when(col('account_number') == account_number, col('balance') + amount).otherwise(col('balance')))


# Create function for withdrawing money
def withdraw_money(df, account_number: int, amount: float):
    if df.filter((col('account_number') == account_number) & (col('balance') < amount)).count() > 0:
        raise ValueError("Unfortunately your balance is insufficient for any withdrawals right now...")
    return df.withColumn('balance', when(col('account_number') == account_number, col('balance') - amount).otherwise(col('balance')))


# Create function for printing bank balance
def print_balance(df):
    return df.select(col('account_number'), col('balance'))


# Create function for changing account details
def change_account_number(df, current_account_number: int, new_account_number: int):
    return df.withColumn('account_number', when(col('account_number') == current_account_number, new_account_number).otherwise(col('account_number')))


# Create Spark dataframe with dummy data
bank_data = [{'account_number': 12345678, 'balance': 540.00}]
starting_df = spark.createDataFrame(bank_data)

Цей приклад містить кілька функцій з різними обов’язками щодо управління діями, пов’язаними з банківським рахунком, такими як внесення грошей (deposit_money), зняття грошей (withdraw_money), друк балансу (print_balance) і зміна номера рахунку (change_account_number).

# Deposit money 
df_1 = deposit_money(starting_df, account_number=12345678, amount=50)
my_current_account_1 = print_balance(df_1)
my_current_account_1.show()


## Output 1:
+--------------+-------+
|account_number|balance|
+--------------+-------+
|      12345678|  590.0|
+--------------+-------+


# Withdraw money
df_2 = withdraw_money(df_1, account_number=12345678, amount=500)
my_current_account_2 = print_balance(df_2)
my_current_account_2.show()


### Output 2:
+--------------+-------+
|account_number|balance|
+--------------+-------+
|      12345678|   90.0|
+--------------+-------+

Кожна функція є незалежною і має єдину відповідальність, що робить цей приклад відповідним до SRP з точки зору функціонального програмування.

2. Принцип “відкрити-закрити” (OCP – Open-close principle)🚪🔐

Цей принцип означає, що модуль повинен бути відкритим для розширення, але закритим для модифікації.

Іншими словами, ваш код повинен бути написаний таким чином, щоб нову функціональність можна було додати без необхідності вносити зміни в існуючий код.

В ООП це стосується класів і методів, тоді як у ФП це стосується лише функцій. Ізоляція класів, методів і функцій належним чином покращує зручність супроводу і тестування вашої кодової бази.

A. Застосування ООП👥📌

class IFileProcessor:
    def read_file(self, file: str):
        print(f"Reading {file} file into system... ")
    
    def check_file_details(self, file: str, row_count: int):
        print(f"File name: {file},  No of rows: {row_count} ")


file_processor = IFileProcessor()
file_processor.check_file_details(file="Test File", row_count=3)

З точки зору ООП, інтерфейс IFileProcessor у цьому прикладі відповідає ООП, тому що ми можемо додавати нові функції, не змінюючи існуючий код у класі. Якщо ми захочемо додати до класу метод load_file, це можна зробити, не перериваючи структуру коду методів read_file або check_file_details.

# Output:

File name: Test File,  No of rows: 3

B. Застосування ФП 🌟📌

from typing import Callable 
from pyspark.sql.functions import col

# Create the dummy data
data = [{'object': 'A', 'sensor': 'temperature'},
        {'object': 'B', 'sensor': 'ultrasonic'},
        {'object': 'C', 'sensor': 'infrared'} 
       ]


# Create the Spark dataframe
df = spark.createDataFrame(data)


# Create higher-order function that receives different sensors
def detect_with_sensor(df, *sensors: Callable) -> None:
    for i, sensor in enumerate(sensors):
        print(f'Sensor {i + 1} ')
        sensor(df)


# Express sensors as functions
def use_temperature_sensor(df) -> None:
    print("Detecting objects using temperature sensor...")
    df.filter(col('sensor') =='temperature').show()


def use_ultrasonic_sensor(df) -> None:
    print("Detecting objects using ultrasonic sensor...")
    df.filter(col('sensor')=='ultrasonic').show()


def use_infrared_sensor(df) -> None:
    print("Detecting objects using infrared sensor...")
    df.filter(col('sensor')=='infrared').show()

Функція detect_with_sensor узгоджується з OCP з точки зору функціонального програмування. Це пов’язано з тим, що вона може приймати на вхід різні датчики без будь-яких змін у самій функції detect_with_sensor.

Передамо функції у функцію detect_with_sensor і подивимося на результати:

# Detect the objects using different sensors
detect_with_sensor(df, use_infrared_sensor, use_ultrasonic_sensor, use_temperature_sensor)

### Output:

Sensor 1 
Detecting objects using infrared sensor...
+------+--------+
|object|  sensor|
+------+--------+
|     C|infrared|
+------+--------+


Sensor 2 
Detecting objects using ultrasonic sensor...
+------+----------+
|object|    sensor|
+------+----------+
|     B|ultrasonic|
+------+----------+


Sensor 3 
Detecting objects using temperature sensor...
+------+-----------+
|object|     sensor|
+------+-----------+
|     A|temperature|
+------+-----------+

Ми можемо додати стільки датчиків, скільки потрібно, не змінюючи існуючий код, тому він відкритий для розширення і закритий для модифікації.

3. Принцип заміщення Ліскова (ПЗЛ/LSP)👩‍🔬🔄

Принцип підстановки Ліскова (LSP – Liskov Substitution Principle) стверджує, що похідний модуль повинен мати можливість замінити базовий модуль без будь-якої несподіваної поведінки.

ООП інтерпретація цього правила говорить, що батьківський клас повинен мати можливість обмінюватися даними з дочірнім класом без будь-яких неочікуваних проблем, що виникають у коді. Інтерпретація ФП стверджує, що сигнатура початкової функції (вхідні та вихідні типи) повинна відповідати сигнатурі функції-підстановки без виникнення непередбачуваної поведінки.

A. Застосування ООП👥📌

class EnvConfig(ABC):
    @abstractmethod
    def get_config(self, object_name, no_of_objects):
        pass
    

class MongoDBConfig(EnvConfig):
    def get_config(self, object_name, no_of_objects):
        print(f"MONGODB - Database Name: {object_name},  No of tables: {no_of_objects} ")


class AzureBlobConfig(EnvConfig):
    def get_config(self, object_name, no_of_objects):
        print(f"AZURE BLOB - Blob Name: {object_name}, No of blob containers: {no_of_objects} ")

mongodb_config      =  MongoDBConfig()
azure_blob_config   =  AzureBlobConfig()

mongodb_config.get_config(object_name="dummy_db",no_of_objects=3)
azure_blob_config.get_config(object_name="dummy_blob", no_of_objects=16)

Клас EnvConfig є базовим класом (або батьківським класом), тоді як MongoDBConfig і AzureBlobConfig є похідними класами (або дочірніми класами). Похідні класи в цьому прикладі приймають однакову кількість вхідних параметрів у метод get_config, які також є спільними для базового та похідних класів.

Це означає, що ми не побачимо жодних нових винятків, якщо підставимо будь-який похідний клас там, де має працювати клас EnvConfig, і, отже, задовольнимо LSP у цьому прикладі.

### Output:

MONGODB - Database Name: dummy_db,  No of tables: 3 
AZURE BLOB - Blob Name: dummy_blob, No of blob containers: 16

B. Застосування ФП🌟📌

from typing import Callable
from pyspark.sql.functions import col, when


# Create function for using appliances with temperature controls
def use_temperature_controlled_item(df, 
                                    item: str, 
                                    turn_on: Callable[[str], None], 
                                    turn_off: Callable[[str], None], 
                                    change_temperature: Callable[[str], None]) -> None:
    df = turn_on(df, item)
    df = turn_off(df, item)
    df = change_temperature(df, item)
    
    return df


# Create function for turning refridgerator on
def turn_on_fridge(df, item: str) -> None:
    print(f"{item} turned on.")
    return (df.withColumn('status', 
                          when(col('item')==item, 'on')
                          .otherwise(col('status')
                                    )))


# Create function for turning refridgerator off
def turn_off_fridge(df, item: str) -> None:
    print(f"{item} turned off.")
    return (df.withColumn('status', 
                          when(col('item')==item, 'off')
                          .otherwise(col('status')
                                    )))


# Create function for changing refridgerator temperature
def change_temperature_of_fridge(df, item:str) -> None:
    print(f"{item} temperature changed.")
    return (df.withColumn('temperature', 
                         when(col('item')==item, col('temperature') + 1)
                         .otherwise(col('temperature')
                                   )))


# Create Spark dataframe
data = [{'item': 'fridge', 'status': 'off', 'temperature': 4}]
starting_df = spark.createDataFrame(data)

starting_df.show()

### Output: 
+------+------+-----------+
|  item|status|temperature|
+------+------+-----------+
|fridge|   off|          4|
+------+------+-----------+

З точки зору FP, узгодження з LSP означає забезпечення ідентичності сигнатури входу-виходу оригінальної функції з сигнатурою входу-виходу функції-замінника, щоб користувачеві не поверталася ніяка неочікувана поведінка.

У цьому випадку функція use_temperature_controlled_item має три входи – turn_on, turn_off і change_temperature, де кожна функція приймає на вхід Spark DataFrame і рядок, а на виході повертає інший Spark DataFrame. Ми також маємо три окремі функції для керування холодильником – turn_on_fridge, turn_off_fridge та change_temperature_of_fridge, які також приймають на вхід фрейм даних Spark та рядок, а на виході також повертають інший фрейм даних.

Це означає, що ці три функції можна замінити на функцію use_temperature_controlled_item без будь-яких помилок, що повертаються користувачеві, як продемонстровано нижче:

# Change the state of the refridgerator 
change_of_state_df = use_temperature_controlled_item(starting_df, 'fridge', turn_on_fridge, turn_off_fridge, change_temperature_of_fridge)
change_of_state_df.show()


### Output:

fridge turned on.
fridge temperature changed.
fridge turned off.
+------+------+-----------+
|  item|status|temperature|
+------+------+-----------+
|fridge|   off|          5|
+------+------+-----------+

4. Принцип розділення інтерфейсів (ISP)🧩⛔.

Принцип розділення інтерфейсів (ISP – Interface Segregation Principle) стверджує, що модуль не повинен бути змушений приймати функціональність, яку він не розробляв і не очікує використовувати.

В ООП цей принцип порушується, якщо клас містить методи, які його підклас не потребує або не має сенсу використовувати в реальному світі. У ФП цей принцип порушується, якщо функцію змушують залежати від функції, яка їй не потрібна для роботи.

A. Застосування ООП👥📌

class CloudUploader:
    @abstractmethod
    def upload_to_cloud(self):
        pass
    

class DBUploader:
    @abstractmethod
    def upload_to_db(self):
        pass
    

class S3DataUploader(CloudUploader):
    def upload_to_cloud(self):
        print("Uploading files to S3 bucket...")
     
   
        
class PostgresDataUploader(DBUploader):
    def upload_to_db(self):
        print("Uploading tables to Postgres database...")


s3_uploader = S3DataUploader()
s3_uploader.upload_to_cloud()

postgres_uploader = PostgresDataUploader()
postgres_uploader.upload_to_db()

Цей код PySpark слідує за провайдером, підтримуючи окремі інтерфейси для CloudUploader та DBUploader. Це означає, що будь-який клас, який використовує ці інтерфейси, не змушений використовувати методи, які йому не потрібні. Наприклад, клас S3DataUploader має завантажувати файли до S3-відра за допомогою методу upload_to_cloud, а клас PostgresDataUploader має завантажувати дані до бази даних Postgres за допомогою методу upload_to_db.

Отже, правильна реалізація цих об’єктів призведе до такого результату:

### Output:

Uploading files to S3 bucket...
Uploading tables to Postgres database...

Нам вдалося розділити функції так, щоб вони не заважали одна одній. Іншими словами, у цьому прикладі поділ завдань очевидний, тому що кожен клас робить те, що йому потрібно, дотримуючись ООП з точки зору ISP.

B. Заява на отримання ФП🌟📌

from typing import Callable
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import col, when


# Create function for applying transformation functions 
def apply_transformation_to_movie_data(transform_function: Callable[ [DataFrame], DataFrame ], df: DataFrame) -> DataFrame:
    return transform_function(df)


# Create Spark Session
spark = SparkSession.builder.appName("FP Application of ISP with Spark").getOrCreate()


# Create Spark dataframe
movie_data = [('Inception', 2010),
              ('Rush Hour', 1998),
              ('Avengers, Endgame', 2019),
              ('Bad Boyz', 1995),
              ('John Wick', 2014)
             ]

movie_df_1 = spark.createDataFrame(movie_data, ["title", "year"])

# Create function for adding ratings to movies 
def add_rating(df: DataFrame) -> DataFrame:
    return (df.withColumn("rating", when(col("title") == "Inception", 8.8)
                                   .when(col("title") == "Rush Hour", 7.0)
                                   .when(col("title") == "Avengers: Endgame", 8.4)
                                   .when(col("title") == "Bad Boyz", 6.9)
                                   .otherwise(7.4)
                         ))

# Create function for adding awards to movies 
def add_awards(df: DataFrame) -> DataFrame:
    return (df.withColumn("awards", when(col("title") == "Inception", 4)
                                   .when(col("title") == "Rush Hour", 0)
                                   .when(col("title") == "Avengers: Endgame", 0)
                                   .when(col("title") == "Bad Boyz", 0)
                                   .otherwise(0)
                         ))

У функціональному програмуванні ISP досягається шляхом використання чистих функцій, кожна з яких виконує окреме завдання. Функція apply_transformation_to_movie_data приймає функцію, яка виконує перетворення на фреймі даних Spark. Функції add_rating і add_awards – це функції, які виконують два окремих завдання. Ми передаємо їх обидві через функцію apply_transformation_to_movie_data у різний час, не змушуючи жодну з функцій трансформації використовувати інші операції, які вони не використовують.

# Display results
movie_df_2 = apply_transformation_to_movie_data(add_rating, movie_df_1)
movie_df_2.show()

### Output 1:
+-----------------+----+------+
|            title|year|rating|
+-----------------+----+------+
|        Inception|2010|   8.8|
|        Rush Hour|1998|   7.0|
|Avengers, Endgame|2019|   7.4|
|         Bad Boyz|1995|   6.9|
|        John Wick|2014|   7.4|
+-----------------+----+------+



movie_df_3 = apply_transformation_to_movie_data(add_awards, movie_df_2)
movie_df_3.show()

### Output 2:
+-----------------+----+------+------+
|            title|year|rating|awards|
+-----------------+----+------+------+
|        Inception|2010|   8.8|     4|
|        Rush Hour|1998|   7.0|     0|
|Avengers, Endgame|2019|   7.4|     0|
|         Bad Boyz|1995|   6.9|     0|
|        John Wick|2014|   7.4|     0|
+-----------------+----+------+------+

5. Принцип інверсії залежностей (DIP)📲 🧲

Принцип інверсії залежностей (DIP – Dependency Inversion Principle) стверджує, що високорівневі модулі не повинні залежати від низькорівневих модулів, і обидва повинні залежати лише від абстракцій.

В інтерпретації ООП це означає, що високорівневі та низькорівневі модулі (конкретні класи) повинні залежати лише від абстрактних класів. Інтерпретація ФП стверджує, що високорівневі та низькорівневі модулі (функції та їх внутрішня поведінка) повинні залежати лише від абстракцій (вхідних функцій, що передаються в інші функції), тобто ін’єкцій залежностей.

Змушуючи модулі залежати від абстрактних реалізацій, а не від конкретних, цей принцип підвищує рівень вільного зв’язку в коді програми, що полегшує розширення функціональності програми без модифікації існуючого коду.

A. Застосування ООП👥📌

class DataProcessor(ABC):
    @abstractmethod
    def read_data(self):
        pass

class CSVProcessor(DataProcessor):
    def __init__(self, spark: SparkSession, file_path: str):
        self.spark = spark 
        self.file_path = file_path 
        
    def read_data(self):
        print(f"Reading '{self.file_path}' CSV file into program ... ")
        return self.spark.read.csv(self.file_path)
    

class DeltaProcessor(DataProcessor):
    def __init__(self, spark: SparkSession, file_path: str):
        self.spark = spark 
        self.file_path = file_path 
       
    def read_data(self):
        print(f"Reading '{self.file_path}' delta file into program ... ")
        return self.spark.read.delta(self.file_path)


spark = SparkSession.builder.appName("Another Test App for SDW's blog").getOrCreate()

csv_processor = CSVProcessor(spark, "test_location/test_file.csv")
delta_processor = DeltaProcessor(spark, "test_location/test_file.delta")

csv_processor.read_data()
delta_processor.read_data()

Цей приклад відповідає DIP, оскільки класи CSVProcessor і DeltaProcessor є низькорівневими модулями, які залежать від DataProcessor, що є абстрактним класом. Оскільки DataProcessor є абстракцією, він дозволяє створювати інші конкретні класи, подібно до того, як було створено класи CSVProcessor і DeltaProcessor.

### Output:

Reading 'test_location/test_file.csv' CSV file into program ...
Reading 'test_location/test_file.delta' delta file into program ...

B. Застосування ФП🌟📌

from typing import Callable
from pyspark.sql import DataFrame, SparkSession, Row
from pyspark.sql.functions import col, when, lit, round
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType

# Specify food schema

food_schema = StructType([
    StructField("food_type", StringType(), True),
    StructField("calories", IntegerType(), True),
    StructField("ratings", DoubleType(), True),
    StructField("price", DoubleType(), True)
    
])

# Create Spark Session
spark = SparkSession.builder.appName("FP App of DIP").getOrCreate()

# Create Spark dataframe
food_data = [Row(food_type="Chips", calories=50,    ratings=6.5, price=3.99),
             Row(food_type="Salad", calories=100,   ratings=4.0,   price=5.99),
             Row(food_type="Burger", calories= 200, ratings=8.5, price=7.99),
             Row(food_type="Soup",  calories= 130, ratings=7.0, price=6.99)
            ]

food_df = spark.createDataFrame(food_data, food_schema)

# Create function for renaming columns 
def rename_columns(df: DataFrame) -> DataFrame:
    return (df.withColumnRenamed("food_type", "item")
              .withColumnRenamed("calories", "calories_in_kcal")
              .withColumnRenamed("ratings", "customer_ratings")
           )


# Create function for adding food categories
def add_food_category(df: DataFrame) -> DataFrame:
    return (df.withColumn("food_category", F.when(F.col("item").isin(["Pizza", "Burger", "Hotdog"]), "Fast Food")
                          .when(F.col("item").isin(["Soup", "Salad"]), "Healthy")
                          .otherwise("Others")
                         ))


# Create function for adding vegetarian flag 
def add_vegetarian(df: DataFrame) -> DataFrame:
    return (df.withColumn("is_vegetarian", 
                          F.when(F.col("item").isin(["Salad", "Soup"]), True)
                          .otherwise(False)
                         ))


# Execute transformations
transformed_food_df = (food_df
                       .transform(rename_columns)
                       .transform(add_food_category)
                       .transform(add_vegetarian)
                      )

У цьому прикладі абстракції – це функції, які передаються у кожну функцію перетворення, застосовану до початкового датафрейму food_df (тобто функції rename_columns, add_food_category і add_vegetarian), де високорівневим модулем вважається весь конвеєр, що генерує трансформований датафрейм food_df, а низькорівневі модулі – реалізована логіка в кожній із згаданих абстрактних функцій, що робить цей приклад функціонального програмування гармонійним з DIP.

# Display results
transformed_food_df.show()

### Output: 
+------+----------------+----------------+-----+-------------+-------------+
|  item|calories_in_kcal|customer_ratings|price|food_category|is_vegetarian|
+------+----------------+----------------+-----+-------------+-------------+
| Chips|              50|             6.5| 3.99|       Others|        false|
| Salad|             100|             4.0| 5.99|      Healthy|         true|
|Burger|             200|             8.5| 7.99|    Fast Food|        false|
|  Soup|             130|             7.0| 6.99|      Healthy|         true|
+------+----------------+----------------+-----+-------------+-------------+

Чому PySpark більше підходить для функціонального програмування (ФП), ніж ООП📊🥊

PySpark надає перевагу парадигмі функціонального програмування (ФП) більше, ніж ООП. Ось деякі з причин чому:

  • Незмінність 🛡️- Як згадувалося раніше, PySpark більше працює з незмінними типами даних, що є одним з ключових принципів функціонального програмування
  • Паралелізм 🧩- FP зменшує ймовірність виникнення побічних ефектів, що дозволяє легко розподіляти завдання між різними машинами
  • Обчислення без стану 🌍- Операції PySpark можуть виконуватися незалежно з ООП, оскільки він не надає пріоритету управлінню станом, як це робить ООП

Чи можна застосувати принципи SOLID до PySpark?🧐🔩?

Безумовно! Хоча ця стаття, як видається, має на увазі, що PySpark є більш сприятливим для використання в рамках ПС, це не означає, що PySpark не може скористатися перевагами, які можуть принести принципи SOLID, навіть в рамках парадигми ООП.

Це просто означає, що інженери повинні використовувати своє професійне судження для оцінки потреб свого проекту і включати принципи відповідно, оскільки кожен проект потребує різних парадигм, щоб керувати життєвим циклом розробки.

Наприклад, якщо проект потребує державного управління, ООП буде більш підходящим вибором. Однак, якщо пріоритетом проекту є розподілені обчислення з незмінними структурами даних, більш життєздатним варіантом буде FP.

Висновок🏁

Інтегрувати всі принципи SOLID в кожен проект легше сказати, ніж зробити, тому важливо розуміти, коли надавати пріоритет одним принципам над іншими, а коли включити кілька, а решту залишити. Архітектори та інженери, які керують проектом з інженерії даних, повинні зробити цю оцінку і вирішити, що найкраще підходить для конкретної ситуації.

Додатково Resources⚙️

Для більш глибокого занурення та розуміння цих тем інженерії даних з чіткими та простими прикладами коду про те, як вони застосовуються в реальному світі, підпишіться на мою розсилку нижче, де ви також можете надіслати будь-які питання, які у вас виникли в приватному порядку, і я буду радий відповісти на них безкоштовно:

https://aianddatainsights.beehiiv.com/subscribe?source=post_page—–e604577837b——————————–
https://medium.com/@sdw-online/subscribe

Не соромтеся звертатися через мої ручки: LinkedIn| Email | Twitter

ОРИГІНАЛ СТАТТІ:SOLID Principles for Spark Applications

АВТОР СТАТІ:Stephen David-Williams

🚀Долучайтесь до нашої спільноти Telegram:

🚀Долучайтесь до нашої спільноти FaceBook:

🚀Долучайтесь до нашої спільноти Twiter X:

Leave a Reply

Your email address will not be published. Required fields are marked *