The dagster_pandas
library provides the ability to perform data validation, emit summary
statistics, and enable reliable dataframe serialization/deserialization. On top of this, the Dagster
type system generates documentation of your dataframe constraints and makes it accessible in the
Dagit UI.
To create a custom dagster_pandas
type, use create_dagster_pandas_dataframe_type
and provide a
list of PandasColumn
objects which specify column-level schema and constraints. For example, we
can construct a custom dataframe type to represent a set of e-bike trips in the following way:
TripDataFrame = create_dagster_pandas_dataframe_type(
name="TripDataFrame",
columns=[
PandasColumn.integer_column("bike_id", min_value=0),
PandasColumn.categorical_column("color", categories={"red", "green", "blue"}),
PandasColumn.datetime_column(
"start_time", min_datetime=datetime(year=2020, month=2, day=10)
),
PandasColumn.datetime_column("end_time", min_datetime=datetime(year=2020, month=2, day=10)),
PandasColumn.string_column("station"),
PandasColumn.exists("amount_paid"),
PandasColumn.boolean_column("was_member"),
],
)
Once our custom data type is defined, we can use it as the type declaration for the inputs / outputs of our solid:
@solid(output_defs=[OutputDefinition(name="trip_dataframe", dagster_type=TripDataFrame)])
def load_trip_dataframe(_) -> DataFrame:
return read_csv(
script_relative_path("./ebike_trips.csv"),
parse_dates=["start_time", "end_time"],
date_parser=lambda x: datetime.strptime(x, "%Y-%m-%d %H:%M:%S.%f"),
)
By passing in these PandasColumn
objects, we are expressing the schema and constraints we expect
our dataframes to follow when Dagster performs type checks for our solids. Moreover, if we go to the
solid viewer, we can follow our schema documented in Dagit:
Now that we have a custom dataframe type that performs schema validation during a pipeline run, we can express dataframe level constraints (e.g number of rows, or columns).
To do this, we provide a list of dataframe constraints to create_dagster_pandas_dataframe_type
;
for example, using RowCountConstraint
. More information on the available constraints can be found
in the dagster_pandas
API docs.
This looks like:
ShapeConstrainedTripDataFrame = create_dagster_pandas_dataframe_type(
name="ShapeConstrainedTripDataFrame", dataframe_constraints=[RowCountConstraint(4)]
)
If we rerun the above example with this dataframe, nothing should change. However, if we pass in 100 to the row count constraint, we can watch our pipeline fail that type check.
Aside from constraint validation, create_dagster_pandas_dataframe_type
also takes in a summary statistics function that emits
EventMetadataEntry
objects which are surfaced during pipeline runs.
Since data systems seldom control the quality of the data they receive,
it becomes important to monitor data as it flows through your systems.
In complex pipelines, this can help debug and monitor data drift over
time. Let's illustrate how this works in our example:
def compute_trip_dataframe_summary_statistics(dataframe):
return [
EventMetadataEntry.text(
min(dataframe["start_time"]).strftime("%Y-%m-%d"),
"min_start_time",
"Date data collection started",
),
EventMetadataEntry.text(
max(dataframe["end_time"]).strftime("%Y-%m-%d"),
"max_end_time",
"Date data collection ended",
),
EventMetadataEntry.text(
str(dataframe["bike_id"].nunique()),
"num_unique_bikes",
"Number of unique bikes that took trips",
),
EventMetadataEntry.text(
str(len(dataframe)), "n_rows", "Number of rows seen in the dataframe"
),
EventMetadataEntry.text(
str(dataframe.columns), "columns", "Keys of columns seen in the dataframe"
),
]
SummaryStatsTripDataFrame = create_dagster_pandas_dataframe_type(
name="SummaryStatsTripDataFrame", event_metadata_fn=compute_trip_dataframe_summary_statistics
)
Now if we run this pipeline in the Dagit playground:
PandasColumn
is user-pluggable with custom constraints. They can be constructed directly and
passed a list of ColumnConstraint
objects.
To tie this back to our example, let's say that we want to validate that the amount paid for a
e-bike must be in 5 dollar increments because that is the price per mile rounded up. As a result,
let's implement a DivisibleByFiveConstraint
. To do this, all it needs is a markdown_description
for Dagit which accepts and renders markdown syntax, an error_description
for error logs, and a
validation method which throws a ColumnConstraintViolationException
if a row fails validation.
This would look like the following:
class DivisibleByFiveConstraint(ColumnConstraint):
def __init__(self):
message = "Value must be divisible by 5"
super(DivisibleByFiveConstraint, self).__init__(
error_description=message, markdown_description=message
)
def validate(self, dataframe, column_name):
rows_with_unexpected_buckets = dataframe[dataframe[column_name].apply(lambda x: x % 5 != 0)]
if not rows_with_unexpected_buckets.empty:
raise ColumnConstraintViolationException(
constraint_name=self.name,
constraint_description=self.error_description,
column_name=column_name,
offending_rows=rows_with_unexpected_buckets,
)
CustomTripDataFrame = create_dagster_pandas_dataframe_type(
name="CustomTripDataFrame",
columns=[
PandasColumn(
"amount_paid",
constraints=[ColumnDTypeInSetConstraint({"int64"}), DivisibleByFiveConstraint()],
)
],
)