Why Batch Process via API?
Photoroom and remove.bg charge extra for batch access. PixelAPI has no batch tier — just call the same endpoints in parallel loops. Your Python script can process your entire product catalog overnight for a few dollars.
| Task | Count | PixelAPI Cost | remove.bg Cost |
|---|---|---|---|
| Background removal | 500 images | $2.50 | $35–100 |
| AI shadow | 500 images | $5.00 | N/A (not offered) |
| 4x Upscale | 500 images | $2.50 | N/A |
| Full pipeline (BG + shadow) | 500 images | $7.50 | $35–100 + manual shadows |
Method 1: ThreadPoolExecutor (Simplest)
Best for: moderate batches (50–500 images), synchronous workflows.
import requests
import concurrent.futures
import pathlib
from pathlib import Path
API_KEY = "YOUR_API_KEY"
BASE_URL = "https://api.pixelapi.dev"
def remove_background(img_path: Path) -> str:
"""Remove background from one image. Returns output path."""
with open(img_path, "rb") as f:
response = requests.post(
f"{BASE_URL}/v1/image/remove-background",
headers={"Authorization": f"Bearer {API_KEY}"},
files={"image": f},
data={"output_format": "png"}
)
response.raise_for_status()
data = response.json()
# Poll until done
gen_id = data["generation_id"]
for _ in range(60):
status = requests.get(
f"{BASE_URL}/v1/generation/{gen_id}",
headers={"Authorization": f"Bearer {API_KEY}"}
).json()
if status["status"] == "completed":
# Download result
out_path = img_path.parent / "output" / f"{img_path.stem}_nobg.png"
out_path.parent.mkdir(exist_ok=True)
img_data = requests.get(status["output_url"]).content
out_path.write_bytes(img_data)
return str(out_path)
elif status["status"] == "failed":
raise Exception(f"Failed: {status.get('error')}")
import time; time.sleep(1)
# Get all product images
images = list(Path("products/").glob("*.jpg")) + list(Path("products/").glob("*.png"))
print(f"Processing {len(images)} images...")
# Process 10 at a time (10 concurrent requests)
results = []
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
futures = {executor.submit(remove_background, img): img for img in images}
for future in concurrent.futures.as_completed(futures):
img = futures[future]
try:
out = future.result()
results.append(("OK", img.name, out))
print(f" ✅ {img.name}")
except Exception as e:
results.append(("ERR", img.name, str(e)))
print(f" ❌ {img.name}: {e}")
ok = sum(1 for r in results if r[0] == "OK")
cost = ok * 0.005 # $0.005 per image
print(f"\n✅ Done: {ok}/{len(images)} images | Cost: ${cost:.2f}")
Method 2: Async/Await (Fastest — 10x+ speed)
Best for: large batches (500+ images), fastest possible throughput.
import asyncio
import aiohttp
import aiofiles
from pathlib import Path
API_KEY = "YOUR_API_KEY"
BASE_URL = "https://api.pixelapi.dev"
MAX_CONCURRENT = 20 # 20 simultaneous requests
async def process_image(session: aiohttp.ClientSession,
img_path: Path,
semaphore: asyncio.Semaphore) -> dict:
async with semaphore: # Limit concurrent requests
# Upload and queue job
async with aiofiles.open(img_path, "rb") as f:
data = await f.read()
form = aiohttp.FormData()
form.add_field("image", data, filename=img_path.name, content_type="image/jpeg")
form.add_field("output_format", "png")
async with session.post(
f"{BASE_URL}/v1/image/remove-background",
data=form,
headers={"Authorization": f"Bearer {API_KEY}"}
) as resp:
result = await resp.json()
gen_id = result["generation_id"]
# Poll for completion
for _ in range(120):
await asyncio.sleep(1)
async with session.get(
f"{BASE_URL}/v1/generation/{gen_id}",
headers={"Authorization": f"Bearer {API_KEY}"}
) as resp:
status = await resp.json()
if status["status"] == "completed":
# Download result
async with session.get(status["output_url"]) as resp:
img_bytes = await resp.read()
out_path = img_path.parent / "output" / f"{img_path.stem}_nobg.png"
out_path.parent.mkdir(exist_ok=True)
async with aiofiles.open(out_path, "wb") as f:
await f.write(img_bytes)
return {"status": "ok", "file": img_path.name, "output": str(out_path)}
elif status["status"] == "failed":
return {"status": "failed", "file": img_path.name, "error": status.get("error")}
return {"status": "timeout", "file": img_path.name}
async def batch_process(image_dir: str = "products/"):
images = list(Path(image_dir).glob("**/*.jpg")) + list(Path(image_dir).glob("**/*.png"))
print(f"🚀 Processing {len(images)} images with {MAX_CONCURRENT} concurrent workers...")
semaphore = asyncio.Semaphore(MAX_CONCURRENT)
connector = aiohttp.TCPConnector(limit=MAX_CONCURRENT)
async with aiohttp.ClientSession(connector=connector) as session:
tasks = [process_image(session, img, semaphore) for img in images]
results = await asyncio.gather(*tasks, return_exceptions=True)
ok = sum(1 for r in results if isinstance(r, dict) and r.get("status") == "ok")
print(f"\n✅ Complete: {ok}/{len(images)} | Cost: ${ok * 0.005:.2f}")
return results
# Run it
asyncio.run(batch_process("products/"))
Method 3: Full Product Pipeline (BG + Shadow + Upscale)
The most common e-commerce workflow: remove background → add shadow → optional upscale.
import requests, time, concurrent.futures
from pathlib import Path
API_KEY = "YOUR_API_KEY"
BASE = "https://api.pixelapi.dev"
HEADERS = {"Authorization": f"Bearer {API_KEY}"}
def wait_for_job(gen_id: str, timeout: int = 120) -> dict:
for _ in range(timeout):
r = requests.get(f"{BASE}/v1/generation/{gen_id}", headers=HEADERS).json()
if r["status"] in ("completed", "failed"):
return r
time.sleep(1)
raise TimeoutError(f"Job {gen_id} timed out")
def full_pipeline(img_path: Path) -> str:
"""Step 1: Remove BG → Step 2: Add Shadow → Return final URL"""
# Step 1: Remove background
with open(img_path, "rb") as f:
r1 = requests.post(f"{BASE}/v1/image/remove-background",
headers=HEADERS,
files={"image": f},
data={"output_format": "png"})
r1_data = wait_for_job(r1.json()["generation_id"])
if r1_data["status"] != "completed":
raise Exception(f"BG removal failed: {r1_data.get('error')}")
nobg_url = r1_data["output_url"]
# Step 2: Add realistic shadow
r2 = requests.post(f"{BASE}/v1/image/add-shadow",
headers={**HEADERS, "Content-Type": "application/json"},
json={
"image_url": nobg_url,
"shadow_type": "soft",
"shadow_opacity": 0.5,
"shadow_blur": 20
})
r2_data = wait_for_job(r2.json()["generation_id"])
if r2_data["status"] != "completed":
raise Exception(f"Shadow failed: {r2_data.get('error')}")
# Download final result
final_url = r2_data["output_url"]
out_path = Path("output") / f"{img_path.stem}_final.png"
out_path.parent.mkdir(exist_ok=True)
out_path.write_bytes(requests.get(final_url).content)
cost = 0.005 + 0.010 # BG + Shadow
print(f" ✅ {img_path.name} → {out_path.name} (${cost:.3f})")
return str(out_path)
# Batch process entire catalog
images = list(Path("products/").glob("*.jpg"))
print(f"Processing {len(images)} images through full pipeline...")
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as pool:
results = list(pool.map(full_pipeline, images))
total_cost = len(results) * 0.015 # BG + Shadow per image
print(f"\n✅ Done! {len(results)} products ready | Total: ${total_cost:.2f}")
print(f" vs remove.bg + manual Photoshop: $35–100+ and hours of work")
Node.js Version
const fs = require('fs');
const path = require('path');
const fetch = require('node-fetch');
const FormData = require('form-data');
const API_KEY = 'YOUR_API_KEY';
const BASE = 'https://api.pixelapi.dev';
async function waitForJob(genId, timeout = 120) {
for (let i = 0; i < timeout; i++) {
await new Promise(r => setTimeout(r, 1000));
const res = await fetch(`${BASE}/v1/generation/${genId}`,
{ headers: { Authorization: `Bearer ${API_KEY}` }});
const data = await res.json();
if (['completed', 'failed'].includes(data.status)) return data;
}
throw new Error('Timeout');
}
async function removeBackground(imgPath) {
const form = new FormData();
form.append('image', fs.createReadStream(imgPath));
form.append('output_format', 'png');
const res = await fetch(`${BASE}/v1/image/remove-background`, {
method: 'POST',
headers: { Authorization: `Bearer ${API_KEY}`, ...form.getHeaders() },
body: form
});
const { generation_id } = await res.json();
return waitForJob(generation_id);
}
// Batch process with concurrency limit (p-limit)
const pLimit = require('p-limit');
const limit = pLimit(10); // 10 concurrent
const images = fs.readdirSync('products/').filter(f => f.match(/\.(jpg|png)$/));
console.log(`Processing ${images.length} images...`);
const tasks = images.map(img => limit(async () => {
const result = await removeBackground(`products/${img}`);
if (result.status === 'completed') {
const imgBuf = await (await fetch(result.output_url)).buffer();
fs.writeFileSync(`output/${path.basename(img, path.extname(img))}_nobg.png`, imgBuf);
console.log(` ✅ ${img}`);
}
}));
Promise.all(tasks).then(() => {
const cost = images.length * 0.005;
console.log(`\n✅ Done! Cost: $${cost.toFixed(2)}`);
});
💡 Pro tip: Set
max_workers=10 for ThreadPoolExecutor or MAX_CONCURRENT=20 for async. Going higher won't help much — our API processes jobs in parallel across multiple GPU workers anyway.
⚠️ Rate limits: Free plan: 10 req/min. Starter: 60 req/min. Pro: 300 req/min. Scale: unlimited. For large batches, use the Scale plan ($200/mo for 300K credits) to process 20,000 images/day.
Cost Calculator
| Operation | Per Image | 500 Images | 10,000 Images |
|---|---|---|---|
| Background Removal | $0.005 | $2.50 | $50 |
| AI Shadow | $0.010 | $5.00 | $100 |
| 4x Upscale | $0.005 | $2.50 | $50 |
| Full Pipeline (BG+Shadow) | $0.015 | $7.50 | $150 |
Compare: remove.bg charges $35–100 for 500 images. Photoroom charges per API call with no bulk discount.
Start Batch Processing Today
Free plan includes 100 credits. No credit card required.
Start Free → API Docs