Self hosted Tasks

Communicate with Orchestra for Tasks we cannot monitor

For each Task run, Orchestra needs to know the current status of it so it can efficiently orchestrate the rest of the Pipeline. In the majority of cases, Orchestra is able to maintain a connection with the integration platform to update the status when required.

However, there are use cases for running Tasks where Orchestra is unable to monitor the Task once it has been triggered. For example: running Tasks in a framework such as:

  • AWS Lambda

  • GCP Cloud Functions

  • Processes running in any machine that has been triggered via SSH

In these scenarios, the Task needs to be able to communicate the status back to Orchestra. This is enabled via two mechanisms:

  1. Sending HTTP requests to our webhook API

orchestra-sdk

This package will handle the communication with Orchestra for your Task run with minimal configuration. Orchestra currently only supports Python. For example:

@orchestra.run()
def my_function(arg1, arg2=1):
    print("Running complex process")

Orchestra will trigger my_function through some integration (e.g. AWS Lambda) - when my_function completes, the status of the Task will be sent back to Orchestra.

More information on the package is available in PyPi.

Sending HTTP

Tasks triggered in AWS Lambda or on self-hosted SSH machines may not be Python or involve more complex processes that the orchestra-sdk cannot handle.

In this case, you can use HTTP directly to communicate with Orchestra. This allows Tasks to be written in any language or framework as long as the infrastructure it is running in has outbound access to the Internet.

Bash Example

#!/bin/bash

# Variables
API_KEY="<YOUR_API_KEY_FROM_ORCHESTRA>"
WEBHOOK_URL="https://webhook.getorchestra.io"
TASK_RUN_ID="<TASK_RUN_ID_SENT_FROM_ORCHESTRA_WHEN_TASK_TRIGGERED>"

# Function to send HTTP request to the webhook API
send_webhook() {
    local status=$1
    local message=$2
    local timestamp=$(date -u +"%Y-%m-%dT%H:%M:%SZ")

    curl -X POST "$WEBHOOK_URL" \
        -H "Authorization: Bearer $API_KEY" \
        -H "Content-Type: application/json" \
        -d "{
            \"event_type\": \"UPDATE_STATUS\",
            \"task_run_id\": \"$TASK_RUN_ID\",
            \"status\": \"$status\",
            \"message\": \"$message\",
            \"metadata\": {},
            \"timestamp\": \"$timestamp\"
        }"
}

# Run your process here
echo "Running some process..."
# Example process: sleep for 10 seconds (replace this with your actual process)
sleep 10

# Check the exit status of the process
if [ $? -eq 0 ]; then
    # Send webhook with "succeeded" status
    send_webhook "SUCCEEDED" "The process completed successfully."
else
    # Send webhook with "failed" status
    send_webhook "FAILED" "Task failed from bash."
fi

Javascript example

const axios = require('axios');
const process = require('process');

const API_KEY = "<YOUR_API_KEY_FROM_ORCHESTRA>";
const WEBHOOK_URL = "https://webhook.getorchestra.io";
const TASK_RUN_ID = "<TASK_RUN_ID_SENT_FROM_ORCHESTRA_WHEN_TASK_TRIGGERED>";

async function sendWebhook(status, message) {
    const timestamp = new Date().toISOString();

    const data = {
        event_type: "UPDATE_STATUS",
        task_run_id: TASK_RUN_ID,
        status: status,
        message: message,
        metadata: {},
        timestamp: timestamp
    };

    try {
        await axios.post(WEBHOOK_URL, data, {
            headers: {
                "Authorization": `Bearer ${API_KEY}`,
                "Content-Type": "application/json"
            }
        });
        console.log(`Webhook sent with status: ${status}`);
    } catch (error) {
        console.error("Error sending webhook:", error.message);
    }
}

// Example process: sleep for 10 seconds
setTimeout(async () => {
    const processSucceeded = true;  // replace with actual process logic

    if (processSucceeded) {
        await sendWebhook("SUCCEEDED", "The process completed successfully.");
    } else {
        await sendWebhook("FAILED", "Task failed from Node.js.");
    }
}, 10000);

Ruby example

require 'net/http'
require 'json'
require 'uri'

API_KEY = "<YOUR_API_KEY_FROM_ORCHESTRA>"
WEBHOOK_URL = "https://webhook.getorchestra.io"
TASK_RUN_ID = "<TASK_RUN_ID_SENT_FROM_ORCHESTRA_WHEN_TASK_TRIGGERED>"

def send_webhook(status, message)
  timestamp = Time.now.utc.iso8601
  uri = URI(WEBHOOK_URL)

  request = Net::HTTP::Post.new(uri)
  request['Authorization'] = "Bearer #{API_KEY}"
  request['Content-Type'] = 'application/json'

  request.body = {
    event_type: "UPDATE_STATUS",
    task_run_id: TASK_RUN_ID,
    status: status,
    message: message,
    metadata: {},
    timestamp: timestamp
  }.to_json

  response = Net::HTTP.start(uri.hostname, uri.port, use_ssl: true) do |http|
    http.request(request)
  end

  puts "Webhook sent with status: #{status}" if response.is_a?(Net::HTTPSuccess)
end

# Example process: sleep for 10 seconds
sleep 10

# Check the result of the process (replace this with actual logic)
process_succeeded = true

if process_succeeded
  send_webhook("SUCCEEDED", "The process completed successfully.")
else
  send_webhook("FAILED", "Task failed from Ruby.")
end

HTTP Specification

For sending Task updates:

{
   "event_type": "UPDATE_STATUS", // required
   "task_run_id": "TASK_RUN_ID", // must be UUID format
   "status": "FAILED", // see status list below
   "message": "Task failed from bash", // optional string
   "metadata": {}, // optional JSON
   "timestamp": "2024-08-09" // required - must be ISO format
}

For sending logs linked to the Task:

{
   "event_type": "LOG", // required
   "task_run_id": "TASK_RUN_ID", // must be UUID format
   "log_name": "log.txt", // required
   "data": "Hello, World!", // required string
   "metadata": {}, // optional JSON
   "timestamp": "2024-08-09" // required - must be ISO format
}

Last updated