Skip to content

Event Driven Pipelines

Overview

Event driven pipelines are a new way of orchestrating workflows in Peach.

peach-tasks on schedule and peach-endpoints on realtime / streaming

Until now, a peach-task would run in isolation from other peach-tasks and peach-endpoints. It would run on a schedule and do some work.

A common pattern is to store the result of the task's computation in redis or influxdb or milvus and use it to have Recommendation APIs (peach-endpoints) serve recommendations.

event-driven-pipelines

Event driven pipelines

Now we can let peach-tasks and peach-endpoints work togther and orchestrate more complex event driven workflows.

An example would be creating a webhook for audio file transcription. The webhook would receive the audio file and trigger a task to transcribe it. And the transcribed text could be fed to a recommendation task to generate recommendations.

syntax for event triggering tasks:

You can use helper function to trigger a peach-task from inside another peach-tasks, peach-endpoints and even from inside peach-lab.

Here is syntax for triggering a task:

from pipe_algorithms_lib.task_helpers import trigger_task_with_event

TASK_NAME = "collaborative_filtering" # one of your real task name as specified in peach.conf
PAYLOAD = {"key1" : "value1", "key2" : "value2"}

trigger_task_with_event(TASK_NAME, PAYLOAD)

example usecase : webhook for audio file transcription.

If a large audio file is uploaded to s3, we can trigger a task to transcribe it. The upload event can be received by an endpoint - which would act as a webhook and then it can trigger the task.

peach.conf  
# partial config as example

tasks:
    audio_transcription:
        method: audio_transcription_task
endpoints:
    audio_transcription_webhook:
        method: webhook_handler
from pipe_algorithms_lib.task_helpers import trigger_task_with_event

TASK_NAME = "audio_transcription" # one of your real task name as specified in peach.conf
PAYLOAD = {"s3_uri" : "s3://bucket/key", "language" : "en"}

def audio_transcription_task(payload):
    s3_uri = payload["s3_uri"]
    language = payload["language"]
    # do transcription

def webhook_handler(request):
    s3_uri = request.json["s3_uri"]
    language = request.json["language"]
    PAYLOAD = {"s3_uri" : s3_uri, "language" : language}

    trigger_task_with_event(TASK_NAME, PAYLOAD)

static multi-stage pipelines (DAGs)

So far we have seen the possibilty to build dynamic event driven pipelines - where one task (or endpoint) can trigger another task to build a dynamic graph of the pipeline.

But in some cases the order of task execution is known before hand. In those case you can build a static pipeline for it. A Dynamic Acyclic Graph (DAG) is a graph that is acyclic (no cycles) and directed (edges have a direction). So it is a graph that is acyclic and directed. The static multi-stage pipelines on peach will be DAGs.

For this we rely on a key downstream in the task config to specify the list of tasks that should be executed after the current task is completed successfully.

example syntax for building static DAG pipelines:

peach.conf

tasks:
    task1:
        method: task1_method
        downstream:
            - task2
            - task3
    task2:
        method: task2_method
    task3:
        method: task3_method

passing data from one DAG stage to another DAG stage.

To pass data from an upstream task to a downstream task, we use a helper function pass_to_child_task. And to receive the data in the downstream task, we use a helper function read_from_parent_task.

import json
from pipe_algorithms_lib.task_helpers import pass_data_to_downstream

# This is the data we want to pass from task1 to task2
PAYLOAD = {"key1": "value1", "key2": "value2"}

def task1_method(payload):
    """
    This task serializes a Python dictionary to a JSON formatted string
    and passes it to its downstream tasks.
    """
    # We serialize the dictionary to a JSON string before passing it
    data_to_pass = json.dumps(PAYLOAD)
    pass_to_child_task(data_to_pass)

def task2_method():
    payload = read_from_parent_task()
    """
    This task receives a JSON formatted string, deserializes it back
    into a Python dictionary, and can then use the data.
    """
    # The payload received is a JSON string, so we deserialize it back to a dictionary
    received_data = json.loads(payload)

    # now you can use the data as a dictionary:
    # print(received_data.get("key1"))

Reach out.

Do reach out to the peach team, we would be happy to help you build these new pipelines.