TL;DRBuild a content humanization pipeline in Python by chaining AI generation, the AI Humanizer API call, and your CMS publish step. Use the batch endpoint for daily content runs. Confidence scores below 0.85 should route to a human review queue.

You’ve got 500 articles that need humanization. You could click through the API dashboard 500 times, pasting content and copying results. Or you could build a pipeline that does it automatically.

This post walks you through building a production-ready Python script that processes your entire content library with the Humanizer API. No manual work. No babysitting. Just fire and forget.

Why You Need a Pipeline

Humanizing content at scale changes everything about your content strategy. Instead of manually refining your best posts, you can systematically improve your entire library. But only if you automate it.

A pipeline handles the boring parts. It reads your content from a CSV file. It submits each piece to the Humanizer API. It waits for results. It saves the humanized versions back. It tracks what failed and retries. It logs everything so you know what happened.

Without a pipeline, you’re limited to whatever you can process manually. With one, you can humanize 10,000 articles overnight.

The Basic Pipeline Structure

Here’s what a minimal pipeline looks like.

import asyncio
import aiohttp
import csv
import json
from datetime import datetime

class HumanizerPipeline:
    def __init__(self, api_key, input_file, output_file):
        self.api_key = api_key
        self.input_file = input_file
        self.output_file = output_file
        self.base_url = "https://api.aihumanizerapi.com/v1"
        self.rate_limit_delay = 0.1  # 100ms between requests
        self.results = []
        self.errors = []

    async def humanize_text(self, session, text, title=""):
        """Send a single text to the Humanizer API"""
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json"
        }
        payload = {
            "text": text,
            "mode": "balanced"
        }
        
        try:
            async with session.post(
                f"{self.base_url}/humanize",
                headers=headers,
                json=payload,
                timeout=aiohttp.ClientTimeout(total=30)
            ) as response:
                if response.status == 200:
                    data = await response.json()
                    return {
                        "original": text[:100],
                        "humanized": data.get("humanized_text", ""),
                        "title": title,
                        "status": "success"
                    }
                else:
                    return {
                        "original": text[:100],
                        "title": title,
                        "status": "error",
                        "error": f"API returned {response.status}"
                    }
        except asyncio.TimeoutError:
            return {
                "original": text[:100],
                "title": title,
                "status": "error",
                "error": "Request timeout"
            }
        except Exception as e:
            return {
                "original": text[:100],
                "title": title,
                "status": "error",
                "error": str(e)
            }

    async def process_batch(self, texts, titles=None):
        """Process multiple texts concurrently with rate limiting"""
        if titles is None:
            titles = [""] * len(texts)
        
        async with aiohttp.ClientSession() as session:
            tasks = []
            for text, title in zip(texts, titles):
                tasks.append(self.humanize_text(session, text, title))
                await asyncio.sleep(self.rate_limit_delay)
            
            results = await asyncio.gather(*tasks)
            return results

    async def run(self):
        """Main pipeline execution"""
        print(f"Starting humanization pipeline at {datetime.now()}")
        
        # Read input CSV
        texts = []
        titles = []
        try:
            with open(self.input_file, 'r', encoding='utf-8') as f:
                reader = csv.DictReader(f)
                for row in reader:
                    texts.append(row.get('content', ''))
                    titles.append(row.get('title', ''))
        except FileNotFoundError:
            print(f"Error: Input file {self.input_file} not found")
            return False
        
        print(f"Loaded {len(texts)} articles from {self.input_file}")
        
        # Process in batches of 100 to avoid memory issues
        batch_size = 100
        all_results = []
        
        for i in range(0, len(texts), batch_size):
            batch_texts = texts[i:i+batch_size]
            batch_titles = titles[i:i+batch_size]
            
            print(f"Processing batch {i//batch_size + 1}...")
            batch_results = await self.process_batch(batch_texts, batch_titles)
            all_results.extend(batch_results)
            
            # Log progress
            successful = sum(1 for r in batch_results if r['status'] == 'success')
            print(f"Batch complete: {successful}/{len(batch_results)} successful")
        
        # Write results to CSV
        try:
            with open(self.output_file, 'w', newline='', encoding='utf-8') as f:
                fieldnames = ['title', 'status', 'original_preview', 'humanized_text', 'error']
                writer = csv.DictWriter(f, fieldnames=fieldnames)
                writer.writeheader()
                
                for result in all_results:
                    writer.writerow({
                        'title': result.get('title', ''),
                        'status': result.get('status', ''),
                        'original_preview': result.get('original', ''),
                        'humanized_text': result.get('humanized', ''),
                        'error': result.get('error', '')
                    })
        except Exception as e:
            print(f"Error writing output file: {e}")
            return False
        
        # Summary
        success_count = sum(1 for r in all_results if r['status'] == 'success')
        error_count = sum(1 for r in all_results if r['status'] == 'error')
        
        print(f"nPipeline complete at {datetime.now()}")
        print(f"Total: {len(all_results)} | Success: {success_count} | Errors: {error_count}")
        
        return True

# Usage
if __name__ == "__main__":
    pipeline = HumanizerPipeline(
        api_key="your-api-key-here",
        input_file="articles.csv",
        output_file="humanized_results.csv"
    )
    
    asyncio.run(pipeline.run())

This script does the heavy lifting. It reads a CSV file with your articles. It submits each one to the API. It waits for responses. It writes the humanized versions to a new CSV file.

The key is the async handling. By using asyncio and aiohttp, you can send multiple requests to the API simultaneously instead of waiting for each one to finish before starting the next. On a typical connection, this cuts your processing time in half or more.

Input and Output Format

Your input CSV needs two columns: title and content.

title,content
"First Blog Post","Long form content here that needs humanization..."
"Second Post","More content to process..."
"Third Article","Additional content..."

The script outputs a CSV with the results.

title,status,original_preview,humanized_text,error
"First Blog Post",success,"Long form content here...",humanized version here,
"Second Post",success,"More content to process...","humanized version",
"Third Article",error,"Additional content...","","API returned 429"

The status column tells you what happened. Success means the humanization worked. Error means something failed, and the error column explains why.

Rate Limit Management

The Humanizer API has rate limits. If you send too many requests too fast, you’ll get throttled. The pipeline handles this with a simple delay between requests.

The line `self.rate_limit_delay = 0.1` means wait 100 milliseconds between requests. That’s conservative and keeps you well under most rate limits. If you have a higher tier plan with more generous limits, you can reduce this to 0.05 or even 0.01.

If the API returns a 429 (rate limited) error, the pipeline logs it and moves on. In production, you’d want to add retry logic with exponential backoff. That means waiting a bit longer and trying again.

async def humanize_text_with_retry(self, session, text, title="", retries=3):
    """Humanize text with automatic retry on rate limit"""
    for attempt in range(retries):
        result = await self.humanize_text(session, text, title)
        
        if result['status'] == 'success':
            return result
        
        if result.get('error', '').startswith('429'):
            wait_time = 2 ** attempt  # Exponential backoff: 1s, 2s, 4s
            print(f"Rate limited. Waiting {wait_time}s before retry...")
            await asyncio.sleep(wait_time)
            continue
        
        # Don't retry on other errors
        return result
    
    return result

With this version, if the API throttles you, the pipeline waits 1 second and tries again. If it’s throttled again, it waits 2 seconds. Then 4 seconds. Most temporary rate limits clear within a few seconds, so this gets you through.

Handling Failures Gracefully

Not every request will succeed. Network hiccups happen. The API might be temporarily down. Your content might be malformed. A production pipeline handles these cases.

The script already logs errors in the output CSV. But you might want to know about them in real time. Add logging to see what’s happening as the pipeline runs.

import logging

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('pipeline.log'),
        logging.StreamHandler()
    ]
)

logger = logging.getLogger(__name__)

# In your pipeline class:
async def humanize_text(self, session, text, title=""):
    # ... existing code ...
    
    if response.status == 200:
        logger.info(f"Successfully humanized: {title}")
        # ... existing code ...
    else:
        logger.error(f"Failed to humanize {title}: {response.status}")
        # ... existing code ...

Now the pipeline writes what it’s doing to both the console and a log file. If something goes wrong, you can look at the log file and see exactly where it failed.

Running on a Schedule

You don’t want to manually run this script every time you have new content. Set it up as a cron job on your server.

# Run humanization pipeline daily at 2 AM
0 2 * * * cd /home/user/pipeline && python run_pipeline.py >> pipeline_cron.log 2>&1

This runs the pipeline every night when traffic is low. Your content gets humanized automatically. You wake up to a CSV file full of results.

For more complex scheduling, consider a task queue like Celery or a managed service like AWS Lambda. But cron works fine for most cases.

Scaling Beyond CSV

Once you’ve got CSV working, you can extend the pipeline to pull content from anywhere. A database. Your CMS. An S3 bucket. The pattern stays the same.

Pull content from the source. Submit to the Humanizer API. Save results back to the destination. Track what succeeded and what failed.

For WordPress, you could query the database directly and update posts after humanization.

import mysql.connector

class WordPressPipeline(HumanizerPipeline):
    def __init__(self, api_key, db_host, db_user, db_password, db_name):
        super().__init__(api_key, "", "")
        self.db_config = {
            'host': db_host,
            'user': db_user,
            'password': db_password,
            'database': db_name
        }
    
    async def fetch_posts_from_wp(self):
        """Get unpublished/draft posts from WordPress database"""
        conn = mysql.connector.connect(**self.db_config)
        cursor = conn.cursor(dictionary=True)
        cursor.execute(
            "SELECT ID, post_title, post_content FROM wp_posts WHERE post_status='draft'"
        )
        posts = cursor.fetchall()
        cursor.close()
        conn.close()
        return posts
    
    async def update_post_in_wp(self, post_id, humanized_content):
        """Save humanized content back to WordPress"""
        conn = mysql.connector.connect(**self.db_config)
        cursor = conn.cursor()
        cursor.execute(
            "UPDATE wp_posts SET post_content=%s WHERE ID=%s",
            (humanized_content, post_id)
        )
        conn.commit()
        cursor.close()
        conn.close()

Now your pipeline can pull draft posts directly from WordPress, humanize them, and save them back. No CSV files. No manual steps.

Monitoring and Alerts

When you run pipelines automatically, you need to know if something breaks. Set up basic monitoring.

Check the output CSV for error counts. If more than 10 percent of requests failed, something’s wrong. Send yourself an alert.

def check_pipeline_health(output_file):
    """Alert if error rate exceeds threshold"""
    error_count = 0
    total_count = 0
    
    with open(output_file, 'r') as f:
        reader = csv.DictReader(f)
        for row in reader:
            total_count += 1
            if row['status'] == 'error':
                error_count += 1
    
    error_rate = error_count / total_count if total_count > 0 else 0
    
    if error_rate > 0.1:  # More than 10% errors
        send_alert(f"Pipeline error rate: {error_rate*100:.1f}%")

For production deployments, integrate with a monitoring tool like DataDog or New Relic. Log pipeline runs and alert on failures. This prevents silent failures where your pipeline runs but produces no valid results.

Cost Estimation

The Humanizer API charges based on words processed. A 1,000-word article costs less than a 5,000-word guide. When you’re processing thousands of articles, costs add up fast.

Before running a large pipeline, calculate the cost. If you have 10,000 articles averaging 2,000 words each, that’s 20 million words. At typical API pricing, that’s meaningful money.

But the trade-off is clear. You can either manually edit 10,000 articles (impossible for most teams) or spend a few hundred dollars to automate it. Automation wins every time.

Getting Started

Start with a test CSV. Five articles. See if it works. Look at the output. Make sure the humanized content is what you expect. Then scale up to your full library.

The pipeline is a template. Customize it for your needs. Add fields to your CSV. Adjust the rate limit. Connect it to your database instead of CSV files. The core pattern stays the same.

Ready to Automate?

A content pipeline gives you superpowers. You can humanize your entire content library while you sleep. Your SEO improves. Your rankings climb. Your audience gets better content.

Get a free API key and documentation to build your pipeline today. Your first 10,000 words are free. No credit card required. Head to our pricing page to claim your free tier and start automating your content workflow.