Pipeline

A Pipeline is an end-to-end workflow that automates a sequence of pipeline components to process data.

#Pipeline Component

A Component is a fundamental building block used to construct a pipeline in VDP. VDP has two types of components: Connectors and Operators. Connector and Operator are served by the pipeline-backend.

VDP uses ConnectorDefinition and OperatorDefinition to define the basic properties and detailed configuration of a connector or a operator. The inner field connector_definition.spec.connection_specification defines the connector configuration JSON Schema, including resource configuration, component configuration and OpenAPI schemas. The inner field operator_definition.spec.connection_specification defines the operator configuration JSON Schema, including component configuration and OpenAPI schemas.

#Pipeline Recipe

A pipeline is defined by a recipe which is essentially a JSON object componsed of multiple components:

  • Operators - a numbers of operators to do flexible and fine-grained data injection and manipulation
  • Connectors - a number of connectors to query, process, or send the ingested unstructured data

{
"version": "v1alpha",
"components": [
{
"id": <component-id>,
"definition_name": <operator-definition-name>, // Operator
"configuration": {...},
...
}
{
"id": <component-id>,
"resource_name": <connector-resource-name>,
"definition_name": <connector-definition-name>, // Connector
"configuration": {...},
...
},
...
]
}

Within the recipe, we have four essential fields:

  1. id: This serves as a unique identifier within the recipe.

  2. resource_name:

  3. definition_name:

    • Represents the name of the connector or operator definition.
    • The format for this field is connector-definitions/<connector-definition-id> or operator-definitions/<operator-definition-id>.
  4. configuration: This field is used to set up the input data and parameters for the component. Detailed information can be found in the section below.

#Data Flow and Input Configuration in Pipeline Components

Within pipeline components, data flow is facilitated through a JSON field called configuration.input, which serves as the means to configure both parameters and data input for the component. Understanding this field is essential for customizing the behavior of each component in a pipeline.

Example Configuration

Let's begin with an example of a configuration.condition:


{
"input": {
"field_1_primitive_value": "this is a book",
"field_2_primitive_value": 1.0,
"field_3_reference": "${ start.a_string_field }",
"field_4_reference": "${ componentA.output.a_string_field }",
"field_5_reference": "${ componentA.output.a_number_array }",
"field_6_template": "${ componentA.output.a_string_field } and we have ${ componentB.output.a_number_field }"
}
}

This configuration.input field allows us to define data input and parameter settings. The VDP will interpret and render this configuration into actual data inputs based on the specified data configuration. There are three fundamental ways to configure data:

#Primitive Value

  • Data configurations can include common JSON data types such as string, number, integer, array, and object.
  • After rendering, the value remains exactly as configured.

Example:

Consider this component configuration:


// Component configuration
{
"input": {
"field_1_primitive_value": "this is a book",
"field_2_primitive_value": 1.0
}
}

When utilizing batch triggering in VDP, where each component processes an array of inputs, the rendered data input for this component, with batch 2 as an example, appears as follows:


// Data input of the component
{
"inputs": [
{
"field_1_primitive_value": "this is a book",
"field_2_primitive_value": 1.0
},
{
"field_1_primitive_value": "this is a book",
"field_2_primitive_value": 1.0
}
]
}

#Reference

  • A Reference employs special syntax enclosed in single curly brackets, e.g., ${ start.a_string_field }.
  • It functions as a variable reference, copying the value from an upstream component to the data input while preserving the original data type.

Example:

Suppose we have the following component configuration:


// Component configuration
{
"input": {
"field_3_reference": "${ start.a_string_field }",
"field_4_reference": "${ componentA.output.a_string_field }",
"field_5_reference": "${ componentA.output.a_number_array }"
}
}

Given the provided upstream component data, the rendered data input for this component appears as follows:


// Data input of the component
{
"inputs": [
{
"field_3_reference": "hello world",
"field_4_reference": "cat is good",
"field_5_reference": [1.0, 1.5, 2.0]
},
{
"field_3_reference": "how are you",
"field_4_reference": "dog is good",
"field_5_reference": [-1.0, -1.5, -2.0]
}
]
}

#Template

  • A Template is a string comprising one or more String Literals, enclosed in double curly brackets, e.g., ${ componentA.output.a_string_field }.
  • It retrieves values from upstream components and converts them to string type for the data input.

Example:

Consider the following component configuration:


// Component configuration
{
"input": {
"field_6_template": "${ componentA.output.a_string_field } and we have ${ componentB.output.a_number_field }"
}
}

Based on the provided upstream component data, the rendered data input for this component appears as follows:


// Data input of the component
{
"inputs": [
{
"field_6_template": "cat is good and we have 1"
},
{
"field_6_template": "dog is good and we have 2"
}
]
}

#Control Flow and Condition Configuration in Pipeline Components

Within pipeline components, control flow is facilitated through a JSON field called configuration.condition, which serves as the means to specify the condition determining whether this component will be executed or not. Understanding this field is essential for setting up the control flow of a pipeline.

Example Configuration

Let's begin with an example of a configuration:


{
"condition": "${start.a_condition_str} == \"TARGET_CONDITION_STR\"",
"input": {
...
}
}

This configuration.condition field allows us to define the condition setting. The VDP will interpret and use this configuration to decide whether this component will be executed or not. We have two types of conditions: "condition on value" and "condition on status".

#Condition on Value

We support the following syntax for the condition:

  • Logic
    • &&
    • ||
  • Comparison
    • <
    • >
    • <=
    • >=
    • ==
    • !=
  • Not
    • !
  • Parentheses
    • ()

Examples:

Here are some examples for the condition field:

  • Condition on string value


    {
    "condition": "${start.a_condition_str} == \"TARGET_CONDITION_STR\""
    }

  • Condition on number value


    {
    "condition": "${start.a_condition_num} > 1"
    }

  • Condition on boolean value


    {
    "condition": "${start.a_condition_bool}"
    }

  • Complex condition


    {
    "condition": "(${start.a_condition_bool} && ${compA.output.x} == 1) || ${compB.output.y} < 1"
    }

  • Always false


    {
    "condition": "false"
    }

#Condition on Status

Besides "condition on value," we also support condition on status. We provide a boolean value called status.completed, which allows you to condition on whether a component's execution is completed or not.

Examples:

Here are some examples for the condition field:

  • The component will be executed after component compA is completed; you can use this to control the execution order of components.


    {
    "condition": "${compA.status.completed}"
    }

  • You can also combine it with the value condition.


    {
    "condition": "${start.a_condition_bool} || ${compA.status.completed}"
    }

#Trigger Pipeline

#Trigger via SYNC API

A pipeline in the SYNC mode responds to a request synchronously. The result is sent back to the user right after the data is processed. This is for real-time inference where low latency is of concern. The request flow when triggering a SYNC pipeline is shown below:

SYNC trigger
SYNC trigger

VDP supports trigger a pipeline via HTTP and gRPC protocols.

#Trigger via ASYNC API

A pipeline in the ASYNC mode performs asynchronous workload. The user triggers the pipeline with an asynchronous request and only receives an acknowledged response. This mode is for use cases that requires long running workloads.

ASYNC pipeline mode
ASYNC pipeline mode

#Trigger via PULL schedular (coming soon)

A pipeline in the PULL mode performs scheduled workload to regularly pull data from the Source to process and write to destinations in the end.

Pipeline PULL mode
Pipeline PULL mode

Last updated: 3/21/2024, 1:42:31 PM