A Practical Example Of The Pipeline Pattern In Python
What is this pattern about?
The Pipeline design pattern (also known as Chain of Command pattern) is a flexible way to handle a sequence of actions, where each handler in the chain processes the input data and passes it to the next handler. This pattern is commonly used in scenarios involving data processing, web scraping, or middleware systems.
In this blog post, I’ll walk you through a specific example that leverages Python’s powerful functools.reduce
and partial
functions, along with the BeautifulSoup
library for parsing HTML content. This code showcases the Pipeline pattern applied to HTML table extraction and processing.
What Does the Code Do?
The code defines a pipeline of data parsing steps for extracting and cleaning tables from an HTML file. It follows a functional programming approach to compose several processing functions into a unified flow using the Chain of Command pattern.
Key Concepts
- Functional Composition: Combining multiple functions into one that executes in a specific order.
- Data Parsing Pipeline: Sequential processing of HTML content into structured data (a DataFrame).
- Error Handling: Ensuring the pipeline gracefully handles missing or malformed data.
Let’s break down the code step by step:
1. Function Composition with compose
from functools import reduce, partial
from typing import Callable
The pipeline is created by composing multiple parsing functions into a single unified function. The compose
function uses reduce
to chain these functions together:
def compose(*functions: ParsingPipeline) -> ParsingPipeline:
"""Composes functions into a single function"""
return reduce(lambda f, g: lambda x: g(f(x)), functions, lambda x: x)
This allows you to define an ordered flow of operations that process input data from one function to the next. Each function modifies the input data, which is then passed down the pipeline.
2. Reading HTML Content
The first step in the pipeline is to read the contents of an HTML file. This is done by read_htm_from
:
def read_htm_from(filename: T, mode: T = "r", encoding: T = "utf-8") -> T:
with open(filename, mode, encoding=encoding) as file:
html_content = file.read()
return html_content
This function opens an HTML file and returns its content as a string. It supports different file modes and encodings, making it flexible for various file formats.
Note that T
is defined here as TypeVar("T")
, see the typing docs.
3. Parsing the HTML Table
Next, read_table_from
uses BeautifulSoup to find the HTML table within the file:
from bs4 import BeautifulSoup
def read_table_from(htm_file: T, parser: str = "html.parser") -> T:
soup = BeautifulSoup(htm_file, parser)
table = soup.find("table")
return table
This function converts the HTML content into a BeautifulSoup object and extracts the first table it finds. The parsed table is passed down the pipeline for further processing.
4. Extracting Rows and Data
Once the table is identified, the pipeline extracts the rows and applies filtering logic based on custom markers:
def extract_row_data_from(
table_rows: T, start_markers: T, continue_markers: T, end_markers: T
) -> T:
row_data: T = []
start_processing = False
for row in table_rows:
if any(marker in row.text for marker in start_markers) and not start_processing:
start_processing = True
continue
if start_processing:
if any(marker in row.text for marker in continue_markers):
continue
if any(marker in row.text for marker in end_markers):
break
row_data.append(row)
return row_data[:-1]
This function inspects each row in the table, checking if the row text matches specified start, continue, or end markers. Data extraction begins after encountering the start marker and ends when the end marker is found.
5. Converting Rows to DataFrame
The next steps involve transforming the extracted row data into a structured pandas DataFrame. First, the rows are separated into individual columns using separate_columns_in
:
def separate_columns_in(rows: T) -> T:
data_rows: T = []
try:
for row in rows:
columns = row.find_all(["td", "th"])
data = [col.text for col in columns]
data_rows.append(data)
return data_rows
except Exception as e:
print(f"An error occurred: {str(e)}")
return []
Then, convert_to_dataframe
reshapes this data into a pandas DataFrame:
def convert_to_dataframe(data_rows: T) -> T:
df = pd.DataFrame(data_rows)
df = df.rename(columns=df.iloc[0]).drop(df.index[0])
df.columns = COLUMN_NAMES
df.drop(columns=COLUMNS_TO_REMOVE, axis=1, inplace=True)
df.set_index(df.columns[0], inplace=True, drop=True)
return df
The DataFrame is cleaned up by renaming columns, removing unnecessary columns, and setting the correct index.
6. Assigning Correct Data Types
Finally, assign_correct_data_type_to
ensures that the DataFrame columns have the appropriate data types:
def assign_correct_data_type_to(
df: T,
dict_types: dict[str, str] = COLUMN_TYPES,
datetime_columns: list[str] = DATETIME_COLUMN_NAMES,
) -> T:
if not isinstance(df, pd.DataFrame):
raise ValueError("Input `df` must be a pandas DataFrame.")
df = df.copy()
for column in datetime_columns:
if column in df.columns:
df[column] = pd.to_datetime(df[column])
for column, col_type in dict_types.items():
if column in df.columns:
try:
if col_type == "numeric":
df[column] = pd.to_numeric(df[column], errors="coerce")
else:
df[column].astype(col_type)
except Exception as e:
print(f"Error converting column {column} to {col_type}: {e}")
return df
This function converts columns into numeric or datetime formats as needed, ensuring that the data is properly structured for analysis.
7. Putting It All Together
At the end of the code, the pipeline is composed by chaining all of the above functions together:
parse_gbx_bt: ParsingPipeline = compose(
partial(read_htm_from, mode="r", encoding="utf-8"),
read_table_from,
read_rows_from,
partial(
extract_row_data_from,
start_markers=["Closed Transactions:"],
continue_markers=["Genbox", "balance", "Deposit"],
end_markers=["Closed P/L:"],
),
separate_columns_in,
convert_to_dataframe,
assign_correct_data_type_to,
)
This creates a fully automated pipeline that:
- Reads an HTML file.
- Extracts table data.
- Cleans and converts the data into a pandas DataFrame.
- Assigns the correct data types.
Conclusion
This implementation of the Chain of Command or Pipeline pattern in Python demonstrates how to apply functional programming principles to data parsing tasks. The use of functools.reduce
and partial
, and BeautifulSoup provides a flexible, reusable way to process HTML content and structure it into usable data.
If you’re looking to create complex data processing pipelines that need to handle dynamic data from HTML or other sources, this approach is a clean and maintainable solution.
You can find the code in the repo: https://github.com/jjeg1979/pyBacktestAnalyzer.
And if you want to watch the code clinic where I presented the tool, feel free to check it out at https://pybites.circle.so/c/circle-coaching-calls/python-for-the-trader-code-clinic.
If you cannot access…well, what are you waiting for to become a PDM member?