Python数据管道中的设计模式

使用适当的代码设计模式可以使您的代码易于阅读、可扩展,并且可以无缝地修改现有逻辑、调试,并使开发人员能够更快地上手。

为了演示代码设计模式,我们将构建一个简单的 ETL 项目,让我们从 Reddit 中提取数据,对其进行转换并将其存储在 sqlite3 数据库中。

flowchart LR A[API] -->|Extract| B[Transform ] B -->|Load| C[Database]
已完成的项目可在github/com/josephmachado/socialetl 上找到,并附有设置说明 。

让我们从一个简单的 Reddit etl 脚本开始,我们将在这篇文章中对其进行重构。

# social_etl.py
# for complete code check out https://github.com/josephmachado/socialetl
import praw
import os
import sqlite3

REDDIT_CLIENT_ID='replace-with-your-reddit-client-id'
REDDIT_CLIENT_SECRET='replace-with-your-reddit-client-secret'
REDDIT_USER_AGENT='replace-with-your-Reddit-user-agent'

def extract():
    client = praw.Reddit(
        client_id=REDDIT_CLIENT_ID,
        client_secret=REDDIT_CLIENT_SECRET,
        user_agent=REDDIT_USER_AGENT,
    )
    subreddit = client.subreddit('dataengineering')
    top_subreddit = subreddit.hot(limit=100)
    data = []
    for submission in top_subreddit:
        data.append(
            {
                'title': submission.title,
                'score': submission.score,
                'id': submission.id,
                'url': submission.url,
                'comments': submission.num_comments,
                'created': submission.created,
                'text': submission.selftext,
            }
        )
    return data


def transform(data):
   
"""
    Function to only keep outliers.
    Outliers are based on num of comments > 2 standard deviations from mean
   
"""
    num_comments = [post.get('comments') for post in data]

    mean_num_comments = sum(num_comments) / len(num_comments)
    std_num_comments = (
        sum([(x - mean_num_comments) ** 2 for x in num_comments])
        / len(num_comments)
    ) ** 0.5
    return [
        post
        for post in data
        if post.get('comments') > mean_num_comments + 2 * std_num_comments
    ]


def load(data):
    # create a db connection
    conn = sqlite3.connect('./data/socialetl.db')
    cur = conn.cursor()
    try:
        # insert data into DB
        for post in data:
            cur.execute(
               
"""
                    INSERT INTO social_posts (
                        id, source, social_data
                    ) VALUES (
                        :id, :source, :social_data
                    )
                   
""",
                {
                    'id': post.get('id'),
                    'score': post.get('score'),
                    'social_data': str(
                        {
                            'title': post.get('title'),
                            'url': post.get('url'),
                            'comments': post.get('num_comments'),
                            'created': post.get('created'),
                            'text': post.get('selftext'),
                        }
                    ),
                },
            )
    finally:
        conn.commit()
        conn.close()


def main():
    # pull data from Reddit
    data = extract()
    # transform reddit data
    transformed_data = transform(data)
    # load data into database
    load(transformed_data)
    
if __name__ == '__main__':
    main()


1、函数式设计
作为一名 DE,您可能听过人们说“编写功能代码”,让我们来解释一下它的含义。

  1. 原子性:一个函数应该只执行一项任务。
  2. 幂等性:如果使用相同的输入多次运行代码,则输出应该是相同的。在将输出存储在外部数据存储中的情况下,输出不应重复。
  3. 无副作用:函数除了其输出之外不应影响任何外部数据(变量或其他)。

注意:附加 FP 概念高阶函数 、函数组合 和引用透明度 。


让我们检查一下上面代码中的load方法:

  • 原子性:不是,因为它要做两件事:管理数据库连接和向数据库加载数据。我们可以使用依赖注入技术将数据库连接作为加载函数的输入。
  • 幂等性:不能,因为它会将所有数据插入 social_posts 表。如果加载函数的输入有重复数据,或者加载函数意外运行了两次,那么重复数据就会插入到 social_posts 表中。我们可以使用 UPSERT 来防止这种情况,它将根据记录是否已经存在(由键标识)来更新或插入记录。
  • 无副作用:加载函数没有副作用。但请注意,当加载函数开始接受数据库连接作为输入参数时(依赖注入),我们不应在加载函数中关闭数据库连接,因为这将影响加载函数外部变量的状态。

让我们看看函数式load函数是什么样子的:

def load(social_data, db_conn) -> None:
    logging.info('Loading twitter data.')
    if db_conn is None:
        raise ValueError(
            'db_cursor is None. Please pass a valid DatabaseConnection'
            ' object.'
        )

    cur = db_conn.cursor()
    try:
        for post in social_data:
            cur.execute(
                """
                INSERT OR REPLACE INTO social_posts (
                    id, source, social_data
                ) VALUES (
                    :id, :source, :social_data
                )
               
""",
                {
                    'id': post.id,
                    'source': post.source,
                    'social_data': str(asdict(post.social_data)),
                },
            )
    finally:
        cur.close()
        db_conn.commit()

2.工厂模式
假设我们必须设计数据管道,从 Twitter、Mastodon、Linkedin 等处提取数据。所有这些社交数据管道都遵循类似的模式。我们可以使用 "工厂 "模式来创建一个统一的界面,用于提取社交数据。让我们来看看工厂模式是如何工作的:

  1. 当多个管道遵循类似模式时,请使用工厂模式。例如,如果要为 Twitter、mastodon、Linkedin 等添加 etl。
  2. 工厂将负责创建相应的(Reddit、Twitter 或 mastodon)etl 对象。调用工厂的代码将使用该 etl 对象,而不知道其内部实现。
  3. 这样就避免了难以管理的复杂 if...else 语句,并提供了与多个类似管道交互的标准接口。
  4. 你可以定义一组所有 ETL 类都必须实现的标准方法。通用方法的名称和签名(输入和输出)被称为抽象接口,因为它定义了我们如何与任何实施该标准的 ETL 进行交互(例如,参见 SocialETL 类)。实际实现被称为具体实现(例如,RedditETL)。

完整代码:here .

import os
from abc import ABC, abstractmethod # python module to define abstract interfaces

# Abstract class with abstract methods
class SocialETL(ABC):
    @abstractmethod
    def extract(self, id, num_records, client):
        pass

    @abstractmethod
    def transform(self, social_data):
        pass

    @abstractmethod
    def load(self, social_data, db_conn):
        pass

    @abstractmethod
    def run(self, db_conn, client, id, num_records):
        pass

# Concrete implementation of the abstract Class
class RedditETL(SocialETL):
    def extract(self, id, num_records, client):
        # code to extract reddit data

    def transform(self, social_data):
        # code to transform reddit data

    def load(self, social_data, db_conn):
        # code to load reddit data into the final table

    def run(self, db_conn, client, id, num_records):
        # code to run extract, transform and load

# Concrete implementation of the abstract Class
class TwitterETL(SocialETL):
    def extract(self, id, num_records, client):
        # code to extract reddit data

    def transform(self, social_data):
        # code to transform reddit data

    def load(self, social_data, db_conn):
        # code to load reddit data into the final table

    def run(self, db_conn, client, id, num_records):
        # code to run extract, transform and load

# This "factory" will acccept an input and give you the appropriate object that you can use to perform ETL
def etl_factory(source):
    factory = {
        'Reddit': (
            praw.Reddit(
                client_id=os.environ['REDDIT_CLIENT_ID'],
                client_secret=os.environ['REDDIT_CLIENT_SECRET'],
                user_agent=os.environ['REDDIT_USER_AGENT'],
            ),
            RedditETL(),
        ),
        'Twitter': (
            tweepy.Client(bearer_token=os.environ['BEARER_TOKEN']),
            TwitterETL(),
        ),
    }
    if source in factory:
        return factory[source]
    else:
        raise ValueError(
            f
"source {source} is not supported. Please pass a valid source."
        )

# calling code
client, social_etl = etl_factory(source)
social_etl.run(db_conn, client, ...)

责任分离(定义、创建和使用)对于代码的维护和可测试性至关重要,并可防止在出现新功能时在多个地方修改代码。请注意,即使我们使用类,我们仍然遵循函数级的功能原则。

优点

  • 如果您有多个类似的 etls,工厂可以显著提高代码的一致性,并使您的管道更容易发展。
  • 使用工厂模式建立与外部系统的连接可以简化测试。例如,通过数据库工厂,您可以在开发测试中使用 sqllite3,在产品测试中使用 pg。你还可以将 spark 会话放在工厂后面,以便在开发阶段使用更少的执行器,而在生产阶段使用更高的内存设置,等等。

缺点
  • 如果使用工厂方法来定义本质上不同的数据管道(例如,ETL v ELT 或 API 数据拉取 vs. S3 -> S3 数据传输等),会使代码变得非常复杂和脆弱。只有当底层数据管道具有相似的结构时,才能使用工厂。
  • 在只有一两个数据管道(没有迹象表明会有更多数据管道)的情况下使用工厂方法是不成熟的优化,可能会因抽象接口限制开发速度而导致开发缓慢。

3.策略模式
我们最初的转换函数使用标准偏差来识别异常值(见转换函数)。假设我们想从多个变换函数中进行选择,例如随机选择几个帖子或不根据输入管道的输入应用任何变换。

我们可以使用策略模式(Strategy Pattern),它允许我们的代码从多种转换方法中选择一种转换方式(又称从多种策略中选择一种策略)。下面是对从 Reddit 或 Twitter 提取的数据进行转换的一些策略:

import random
import logging

def no_transformation(social_data):
    logging.info('No transformation applied.')
    return social_data


def random_choice_filter(social_data):
    logging.info('Randomly choosing 2 social media data points.')
    return random.choices(social_data, k=2)


def standard_deviation_outlier_filter(social_data):
    # code to only keep standard deviation based outlier
    return filtered_data

# a factory to return the appropriate transformation function
def transformation_factory(value):
        factory = {
            'sd': standard_deviation_outlier_filter,
            'no_tx': no_transformation,
            'rand': random_choice_filter,
        }
        return factory[value]

class Reddit(SocialETL):

    def transform(self, social_data, transform_function):
           """函数转换 reddit 数据,只保留
            评论数大于平均值 2 个标准差的帖子。
            的帖子。
            参数:
                social_data(List[RedditPostData]):Reddit 帖子数据列表。
            返回 返回 返回 返回值值值值
                List[RedditPostData]:经过筛选的 reddit 帖子数据列表。
           
"""
            logging.info('Transforming reddit data.')
            return transform_function(social_data)

     def run(self, db_cursor_context, client, transform_function, id = 'dataengineering', num_records = 100):
       
"""运行 ETL 管道的函数。
        参数:
            db_cursor_context (数据库连接):数据库连接。
            client (praw.Reddit):Reddit 客户端。
            id (str):要获取数据的 Subreddit。
            num_records(int):要获取的记录数。
       
"""
        logging.info('Running reddit ETL.')
        self.load(
            social_data=self.transform(
                social_data=self.extract(
                    id=id, num_records=num_records, client=client
                ),
                transform_function=transform_function,
            ),
            db_cursor_context=db_cursor_context,
        )


    # other methods

# Calling code
transformation = 'sd' # 'no_tx' or 'rand'
client, social_etl = etl_factory(source)
db = db_factory()
social_etl.run(
    db_cursor_context=db.managed_cursor(),
    client=client,
    transform_function=transformation_factory(transformation),
)

请注意调用代码是如何根据转换transformation 变量注入转换transformation 函数的。
关键在于转换transformation 函数的输入和输出参数要相同,这样才能切换。
如果没有适当的日志记录,就很难了解执行的是哪个函数。

4.单例和对象池模式
当程序在整个运行过程中只需拥有一个类对象时,请使用单例模式。单例模式常用于数据库连接、日志等。然而,由于所有测试都只能使用一个对象,它可能会给测试带来极大的困难。如果设计时没有足够的防护措施,就可能在 Python 中创建多个单例类。一般来说,Singleton 被认为是一种反模式。

建立在 Singleton 基础上的一种模式被称为对象池模式。在这种模式中,你不能只使用一个对象,而是可以使用对象池中的一个对象。对象池的大小根据使用情况来设定。对象池模式常见于有多个传入请求并需要快速与数据库通信的应用程序(如后端应用程序、流处理)。有了数据库连接池,传入请求就能与数据库通信,而无需创建新连接(耗时较长)或等待单例对象完成为其他请求提供服务。例如,Psycopg2 连接池。不过要注意的是,连接在使用后必须恢复到初始状态,然后才能返回连接池。

在批量数据管道应用程序中,通常最好使用工厂方法来创建 DB 连接,因为这样可以根据环境灵活设置配置,而且无需清理连接以将其返回池等。

查看我们的数据库连接工厂 here