Skip to content

Latest commit

 

History

History
745 lines (591 loc) · 18.5 KB

api_wrapper_documentation.md

File metadata and controls

745 lines (591 loc) · 18.5 KB

CrewAI Content Orchestrator API Wrapper

Note

This documentation explains how to use the CrewAI Content Orchestrator API Wrapper to expose your CrewAI agents and workflows as a RESTful API.

Table of Contents

Overview

The CrewAI Content Orchestrator API Wrapper (api_wrapper.py) is a FastAPI service that exposes your CrewAI agents and workflows as a RESTful API. This enables:

  • Asynchronous execution of CrewAI workflows
  • Human-in-the-Loop (HITL) approval processes
  • Webhook notifications for job status updates
  • Job tracking across multiple concurrent executions

Quick Start

Click to expand quick start instructions
  1. Set up your environment:

    # Install dependencies
    pip install -r requirements.txt
    
    # Copy and configure environment variables
    cp .env.sample .env
    # Edit .env with your API keys
  2. Run the API service:

    bash scripts/run_api.sh
    # Or directly with uvicorn:
    # uvicorn api_wrapper.api_wrapper:app --host 0.0.0.0 --port 8888 --timeout-keep-alive 300
  3. Make a request:

    curl -X POST http://localhost:8888/kickoff \
      -H "Content-Type: application/json" \
      -d '{
        "crew": "ContentCreationCrew",
        "inputs": {
          "topic": "Artificial Intelligence"
        }
      }'

Integration Guide

Code Structure Requirements

The API wrapper supports two integration patterns:

1. CrewBase Pattern (Recommended)

Click to see CrewBase pattern example
from crewai import Agent, Crew, Process, Task
from crewai.project import CrewBase, agent, crew, task

@CrewBase
class MyContentCrew:
    """Content creation crew for generating articles"""
    
    def __init__(self, inputs=None):
        """Initialize with inputs from API"""
        self.inputs = inputs or {}
    
    @agent
    def researcher(self) -> Agent:
        """Define a research agent"""
        return Agent(
            role="Research Specialist",
            goal="Find comprehensive information on the topic",
            backstory="You are an expert researcher with years of experience",
            llm=self._get_llm(),
            verbose=True,
        )
    
    @task
    def research_task(self) -> Task:
        """Define a research task"""
        topic = self.inputs.get("topic", "General Knowledge")
        return Task(
            description=f"Research the topic: {topic}",
            expected_output="A detailed research report",
            agent=self.researcher(),
            human_input=False,
        )
    
    @crew
    def content_crew(self) -> Crew:
        """Define your crew - this will be discovered by the API wrapper"""
        return Crew(
            agents=[self.researcher()],
            tasks=[self.research_task()],
            process=Process.sequential,
            verbose=True,
        )
        
    def _get_llm(self):
        """Configure LLM based on environment variables"""
        # LLM configuration code here

The API wrapper automatically discovers methods decorated with @crew and makes them available as endpoints.

2. Direct Function Pattern

Note

The Direct Function Pattern is currently experimental and may not be fully supported in all versions of the API wrapper. The CrewBase Pattern is the recommended approach.

Click to see Direct Function pattern example (experimental)
def create_content_with_hitl(
    topic: str, 
    feedback: str = None, 
    require_approval: bool = True
) -> Dict[str, Any]:
    """
    Content creation function with human-in-the-loop capability
    
    Args:
        topic: The topic to create content about
        feedback: Optional human feedback for refinement
        require_approval: Whether to require human approval
        
    Returns:
        Dictionary with content and status information
    """
    # Implementation here
    return {
        "status": "needs_approval",  # or "success" or "error"
        "content": "Generated content...",
        "length": 1234,
        "timestamp": datetime.now().isoformat(),
    }

Environment Configuration

Set the path to your CrewAI code in the DATA_APP_ENTRYPOINT environment variable:

# In .env file
DATA_APP_ENTRYPOINT="crewai_app/orchestrator.py"

API Endpoints

Health Check

Endpoint: GET /health

Returns the health status of the API.

curl http://localhost:8888/health
Click to see response example
{
  "status": "healthy",
  "timestamp": "2023-06-15T12:34:56.789012",
  "module_loaded": true,
  "active_jobs": 2
}

Kickoff Endpoint

Endpoint: POST /kickoff

Starts a new content generation job.

Click to see request example
curl -X POST http://localhost:8888/kickoff \
  -H "Content-Type: application/json" \
  -d '{
    "crew": "ContentCreationCrew",
    "inputs": {
      "topic": "Artificial Intelligence",
      "require_approval": true
    },
    "webhook_url": "https://your-webhook-endpoint.com/webhook",
    "wait": false
  }'

Parameters:

  • crew (string): The name of the crew class to use
  • inputs (object): Input parameters for the crew
  • webhook_url (string, optional): URL to receive job status updates
  • wait (boolean, optional): Whether to wait for job completion (default: false)

Warning

The wait=true parameter is not currently functional. All jobs are processed asynchronously regardless of this setting.

Click to see response examples

Response (Asynchronous):

{
  "job_id": "987ca65a-62cf-4c48-850b-ad0eb3e37393",
  "status": "queued",
  "message": "Crew kickoff started in the background"
}

Response (Synchronous, with wait=true):

{
  "job_id": "987ca65a-62cf-4c48-850b-ad0eb3e37393",
  "status": "completed",
  "result": {
    "content": "Generated content...",
    "length": 1234
  }
}

Job Status

Endpoint: GET /job/{job_id}

Retrieves the status and result of a specific job.

curl http://localhost:8888/job/987ca65a-62cf-4c48-850b-ad0eb3e37393
Click to see response example
{
  "id": "987ca65a-62cf-4c48-850b-ad0eb3e37393",
  "crew": "ContentCreationCrew",
  "inputs": {
    "topic": "Artificial Intelligence"
  },
  "status": "completed",
  "created_at": "2023-06-15T12:34:56.789012",
  "completed_at": "2023-06-15T12:40:56.789012",
  "result": {
    "content": "Generated content...",
    "length": 1234
  }
}

Feedback Endpoint

Endpoint: POST /job/{job_id}/feedback

Provides human feedback for a job that's pending approval.

Click to see request example
curl -X POST http://localhost:8888/job/987ca65a-62cf-4c48-850b-ad0eb3e37393/feedback \
  -H "Content-Type: application/json" \
  -d '{
    "feedback": "Please make the content more concise and add more examples.",
    "approved": false
  }'

Parameters:

  • feedback (string): Human feedback on the content
  • approved (boolean): Whether to approve the content as is
Click to see response examples

Response (Approved):

{
  "message": "Feedback recorded and job marked as completed",
  "job_id": "987ca65a-62cf-4c48-850b-ad0eb3e37393"
}

Response (Not Approved):

{
  "message": "Feedback recorded and content generation restarted with feedback",
  "job_id": "987ca65a-62cf-4c48-850b-ad0eb3e37393"
}

List Jobs

Endpoint: GET /jobs

Lists all jobs with optional filtering.

curl "http://localhost:8888/jobs?limit=5&status=completed"

Query Parameters:

  • limit (integer, optional): Maximum number of jobs to return (default: 10)
  • status (string, optional): Filter jobs by status
Click to see response example
{
  "jobs": [
    {
      "id": "987ca65a-62cf-4c48-850b-ad0eb3e37393",
      "crew": "ContentCreationCrew",
      "status": "completed",
      "created_at": "2023-06-15T12:34:56.789012",
      "completed_at": "2023-06-15T12:40:56.789012"
    }
  ],
  "count": 1,
  "total_jobs": 15
}

List Crews

Endpoint: GET /list-crews

Lists all available crews that can be used with the kickoff endpoint.

curl http://localhost:8888/list-crews
Click to see response example
{
  "crews": [
    "ContentCreationCrew",
    "content_crew",
    "content_crew_with_feedback"
  ]
}

Delete Job

Endpoint: DELETE /job/{job_id}

Deletes a job and its associated data.

curl -X DELETE http://localhost:8888/job/987ca65a-62cf-4c48-850b-ad0eb3e37393
Click to see response example
{
  "message": "Job deleted successfully",
  "job_id": "987ca65a-62cf-4c48-850b-ad0eb3e37393"
}

Webhook Notifications

The API wrapper can send webhook notifications to a URL you provide when a job's status changes.

Click to see webhook payload example
{
  "job_id": "987ca65a-62cf-4c48-850b-ad0eb3e37393",
  "status": "completed",
  "crew": "ContentCreationCrew",
  "completed_at": "2023-06-15T12:40:56.789012",
  "result": {
    "content": "Generated content...",
    "length": 1234
  }
}

Webhook Events

The following events trigger webhook notifications:

  1. Job Completed: When a job finishes successfully
  2. Job Error: When a job encounters an error
  3. Pending Approval: When a job is waiting for human approval

Job States

A job can be in one of the following states:

  • queued: Job has been created and is waiting to be processed
  • processing: Job is currently being processed
  • pending_approval: Job is waiting for human approval
  • completed: Job has completed successfully
  • error: Job encountered an error

Implementation Examples

Basic Content Generation

Click to see basic content generation example
# In orchestrator.py
@CrewBase
class ContentCreationCrew:
    """Content creation crew for generating articles"""
    
    def __init__(self, inputs=None):
        self.inputs = inputs or {}
    
    @agent
    def writer_agent(self) -> Agent:
        return Agent(
            role="Content Writer",
            goal="Create engaging content",
            backstory="You are a skilled writer",
            llm=self._get_llm(),
            verbose=True,
        )
    
    @task
    def writing_task(self) -> Task:
        topic = self.inputs.get("topic", "General Knowledge")
        return Task(
            description=f"Write about {topic}",
            expected_output="A well-structured article",
            agent=self.writer_agent(),
            human_input=False,
        )
    
    @crew
    def content_crew(self) -> Crew:
        return Crew(
            agents=[self.writer_agent()],
            tasks=[self.writing_task()],
            process=Process.sequential,
            verbose=True,
        )
    
    def _get_llm(self):
        # Use OPENAI_API_KEY for OpenRouter
        api_key = os.environ.get("OPENAI_API_KEY")
        if not api_key:
            raise ValueError("OPENAI_API_KEY must be set")
        
        return ChatOpenAI(
            model="openai/gpt-4o-mini",
            api_key=api_key,
            base_url="https://openrouter.ai/api/v1",
            temperature=0.7,
        )

Human-in-the-Loop Workflow

Click to see HITL workflow implementation example
# In orchestrator.py
@CrewBase
class ContentCreationCrew:
    # ... other methods ...
    
    @task
    def editing_with_feedback_task(self) -> Task:
        feedback = self.inputs.get("feedback", "Please improve the content.")
        return Task(
            description=f"Edit the content incorporating this feedback: {feedback}",
            expected_output="A polished article addressing the feedback",
            agent=self.editor_agent(),
            context=[self.writing_task()],
            human_input=False,
        )
    
    @crew
    def content_crew_with_feedback(self) -> Crew:
        return Crew(
            agents=[self.writer_agent(), self.editor_agent()],
            tasks=[self.writing_task(), self.editing_with_feedback_task()],
            process=Process.sequential,
            verbose=True,
        )

HITL Workflow Example

Tip

Human-in-the-Loop (HITL) workflows allow for human feedback and approval during the content generation process.

Click to see complete HITL workflow example
  1. Start a job requiring approval:

    curl -X POST http://localhost:8888/kickoff \
      -H "Content-Type: application/json" \
      -d '{
        "crew": "ContentCreationCrew",
        "inputs": {
          "topic": "Climate Change",
          "require_approval": true
        }
      }'
  2. Check job status until it's pending approval:

    curl http://localhost:8888/job/YOUR_JOB_ID
  3. Provide feedback or approve:

    # To approve:
    curl -X POST http://localhost:8888/job/YOUR_JOB_ID/feedback \
      -H "Content-Type: application/json" \
      -d '{
        "feedback": "Content approved as is.",
        "approved": true
      }'
    
    # To request changes:
    curl -X POST http://localhost:8888/job/YOUR_JOB_ID/feedback \
      -H "Content-Type: application/json" \
      -d '{
        "feedback": "Please add more examples about renewable energy.",
        "approved": false
      }'
  4. If feedback was provided, check status again until it's "pending_approval" again, then review the updated content.

Environment Variables

Important

Make sure to set all required environment variables before running the API wrapper.

Required Variables

  • DATA_APP_ENTRYPOINT: Path to your CrewAI code file (e.g., crewai_app/orchestrator.py)

LLM Provider Configuration

OpenRouter Configuration (Default)
  • LLM_PROVIDER=openrouter: Set to use OpenRouter
  • OPENAI_API_KEY: Your API key for OpenRouter
  • OPENAI_API_BASE=https://openrouter.ai/api/v1: The OpenRouter API base URL
  • OPENROUTER_MODEL=openai/gpt-4o-mini: The model to use
Azure OpenAI Configuration
  • LLM_PROVIDER=azure: Set to use Azure OpenAI
  • AZURE_OPENAI_API_KEY: Your Azure OpenAI API key
  • AZURE_OPENAI_ENDPOINT: Your Azure OpenAI endpoint URL
  • AZURE_OPENAI_API_VERSION=2023-05-15: The API version
  • AZURE_OPENAI_DEPLOYMENT_ID=gpt-35-turbo-0125: The deployment ID

Troubleshooting

Warning

Common issues you might encounter when using the API wrapper and how to solve them.

Module Loading Issues

Problem: API fails to load your module

Error: Failed to load user script: No module named 'crewai_app'

Solutions:

  • Verify DATA_APP_ENTRYPOINT is set correctly in your .env file
  • Ensure the Python path includes your project root
  • Check that the file exists and has the correct permissions
Crew Discovery Issues

Problem: API can't find your crew

Error: Crew ContentCreationCrew not found in user module

Solutions:

  • Ensure your class is decorated with @CrewBase
  • Check that at least one method is decorated with @crew
  • Verify the class name matches what you're passing to the API
API Key Issues

Problem: Authentication errors with LLM providers

Error: OPENAI_API_KEY must be set for OpenRouter

Solutions:

  • Set OPENAI_API_KEY in your .env file
  • For Azure, ensure AZURE_OPENAI_API_KEY and AZURE_OPENAI_ENDPOINT are set
  • Verify the API keys are valid and have not expired
Job Execution Issues

Problem: Jobs fail to execute or get stuck

Error: 'NoneType' object has no attribute 'kickoff'

Solutions:

  • Check that your crew method returns a valid Crew object
  • Ensure all required inputs are provided
  • Look for exceptions in your agent or task code
  • Verify LLM configuration is correct

Advanced Configuration

API Server Configuration

# In .env file
API_HOST=0.0.0.0
API_PORT=8888
API_WORKERS=1
API_LOG_LEVEL=info

Webhook Configuration

# In .env file
WEBHOOK_RETRY_ATTEMPTS=3
WEBHOOK_RETRY_DELAY=5

Job Storage

# In .env file
JOB_STORAGE_TYPE=memory  # or 'redis'
REDIS_URL=redis://localhost:6379/0  # if using redis

Security Best Practices

Caution

Implementing proper security measures is crucial when deploying to production environments.

When deploying to production:

  1. Use HTTPS with a valid SSL certificate
  2. Implement authentication using API keys or OAuth
  3. Restrict CORS to trusted domains only
  4. Validate webhook URLs against a whitelist
  5. Set appropriate timeouts for long-running operations
  6. Monitor and rate limit requests to prevent abuse

Further Resources