Event Driven Pipelines
Overview
Event driven pipelines are a new way of orchestrating workflows in Peach.
peach-tasks on schedule and peach-endpoints on realtime / streaming
Until now, a peach-task would run in isolation from other peach-tasks and peach-endpoints. It would run on a schedule and do some work.
A common pattern is to store the result of the task's computation in redis or influxdb or milvus and use it to have Recommendation APIs (peach-endpoints) serve recommendations.
Event driven pipelines
Now we can let peach-tasks and peach-endpoints work togther and orchestrate more complex event driven workflows.
An example would be creating a webhook for audio file transcription. The webhook would receive the audio file and trigger a task to transcribe it. And the transcribed text could be fed to a recommendation task to generate recommendations.
syntax for event triggering tasks:
You can use helper function to trigger a peach-task from inside another peach-tasks, peach-endpoints and even from inside peach-lab.
Here is syntax for triggering a task:
from pipe_algorithms_lib.task_helpers import trigger_task_with_event
TASK_NAME = "collaborative_filtering" # one of your real task name as specified in peach.conf
PAYLOAD = {"key1" : "value1", "key2" : "value2"}
trigger_task_with_event(TASK_NAME, PAYLOAD)
example usecase : webhook for audio file transcription.
If a large audio file is uploaded to s3, we can trigger a task to transcribe it. The upload event can be received by an endpoint - which would act as a webhook and then it can trigger the task.
peach.conf
# partial config as example
tasks:
audio_transcription:
method: audio_transcription_task
endpoints:
audio_transcription_webhook:
method: webhook_handler
from pipe_algorithms_lib.task_helpers import trigger_task_with_event
TASK_NAME = "audio_transcription" # one of your real task name as specified in peach.conf
PAYLOAD = {"s3_uri" : "s3://bucket/key", "language" : "en"}
def audio_transcription_task(payload):
s3_uri = payload["s3_uri"]
language = payload["language"]
# do transcription
def webhook_handler(request):
s3_uri = request.json["s3_uri"]
language = request.json["language"]
PAYLOAD = {"s3_uri" : s3_uri, "language" : language}
trigger_task_with_event(TASK_NAME, PAYLOAD)
static multi-stage pipelines (DAGs)
So far we have seen the possibilty to build dynamic event driven pipelines - where one task (or endpoint) can trigger another task to build a dynamic graph of the pipeline.
But in some cases the order of task execution is known before hand. In those case you can build a static pipeline for it. A Dynamic Acyclic Graph (DAG) is a graph that is acyclic (no cycles) and directed (edges have a direction). So it is a graph that is acyclic and directed. The static multi-stage pipelines on peach will be DAGs.
For this we rely on a key downstream
in the task config to specify the list of tasks that should be executed after the current task is completed successfully.
example syntax for building static DAG pipelines:
peach.conf
tasks:
task1:
method: task1_method
downstream:
- task2
- task3
task2:
method: task2_method
task3:
method: task3_method
passing data from one DAG stage to another DAG stage.
To pass data from an upstream task to a downstream task, we use a helper function pass_to_child_task
. And to receive the data in the downstream task, we use a helper function read_from_parent_task
.
import json
from pipe_algorithms_lib.task_helpers import pass_data_to_downstream
# This is the data we want to pass from task1 to task2
PAYLOAD = {"key1": "value1", "key2": "value2"}
def task1_method(payload):
"""
This task serializes a Python dictionary to a JSON formatted string
and passes it to its downstream tasks.
"""
# We serialize the dictionary to a JSON string before passing it
data_to_pass = json.dumps(PAYLOAD)
pass_to_child_task(data_to_pass)
def task2_method():
payload = read_from_parent_task()
"""
This task receives a JSON formatted string, deserializes it back
into a Python dictionary, and can then use the data.
"""
# The payload received is a JSON string, so we deserialize it back to a dictionary
received_data = json.loads(payload)
# now you can use the data as a dictionary:
# print(received_data.get("key1"))
Reach out.
Do reach out to the peach team, we would be happy to help you build these new pipelines.