- Print
- DarkLight
genblaze Developer Guide
- Print
- DarkLight
genblaze is a Python SDK for building AI media pipelines. You define what to generate, which provider runs it, and where outputs go. genblaze handles execution, retries, storage, and produces a verifiable provenance record for every run.
The genblaze SDK is open source. You can find the full source code, examples, and issue tracker on GitHub.
This guide takes you from your first pipeline to running genblaze in real applications.
Use genblaze if you want to:
Build repeatable AI workflows, not one-off API calls.
Chain models, such as image → video → audio.
Store outputs permanently.
Verify how an asset was generated.
Overview
genblaze is built around four core pieces:
Providers call AI models (OpenAI, Runway, Replicate, and others) and let you switch models without changing your code.
Pipelines define what happens (a single step or multiple steps chained together).
Storage saves outputs so they do not expire.
Manifests record what was generated and let you verify it later.
Getting Started
If you are new to genblaze:
Install the SDK.
Run your first pipeline locally.
Add storage to make outputs permanent.
Building Workflows
As you build:
Use Providers to choose models.
Use Pipelines to chain steps together.
Use Storage to manage outputs and cost.
Use Manifests to verify results.
Going to Production
For production:
Use Streaming to track long-running jobs.
Tune Retry policy for cost and reliability.
Add Observability for logging and tracing.
Prerequisites
Python 3.11+
At least one provider API key
Optional:
Storage is optional for getting started, but required if you want permanent, verifiable outputs.
ffmpegfor muxingOpenTelemetry or LangSmith for tracing
Install
Run these commands in your terminal:
pip install genblaze-core genblaze-openai genblaze-s3genblaze-coreCore SDKgenblaze-openaiOpenAI provider (DALL·E, Sora, etc.)genblaze-s3Storage backend (used for Backblaze B2 and other S3-compatible storage)
Optional providers:
Install additional providers as needed. Install only the providers you plan to use.
pip install genblaze-replicate
pip install genblaze-runway
pip install genblaze-google
pip install genblaze-lumaSet an API Key
Run the following command in your terminal:
export OPENAI_API_KEY="your-api-key"Run a Pipeline
Copy the code below into a file (for example, main.py) and run it with:
python filename.pyfrom genblaze_core import Pipeline, Modality
from genblaze_openai import DalleProvider
pipeline = Pipeline("my-first-pipeline")
result = pipeline.step(
DalleProvider(output_dir="output/"),
model="dall-e-3",
prompt="a red fox sitting in a snowy forest, golden hour",
modality=Modality.IMAGE,
).run()
asset = result.run.steps[0].assets[0]
print("Saved to:", asset.url)
print("SHA256:", asset.sha256)Expected:
An image saved to
output/A file URL, such as
file://...A SHA-256 hash
If this works, your setup is correct.
Add Durable Storage
Provider URLs expire. Storage makes them permanent.
Set your Backblaze B2 Credentials
Run the following command in your terminal:
export B2_KEY_ID="your-key-id"
export B2_APP_KEY="your-app-key"Update the Pipeline to Use Storage
Copy the code below into your Python file (for example, main.py), replacing your previous pipeline:
from genblaze_core import Pipeline, Modality, ObjectStorageSink, KeyStrategy
from genblaze_openai import DalleProvider
from genblaze_s3 import S3StorageBackend
storage = ObjectStorageSink(
S3StorageBackend.for_backblaze("my-bucket"),
key_strategy=KeyStrategy.HIERARCHICAL,
)
result = (
Pipeline("stored")
.step(
DalleProvider(),
model="dall-e-3",
prompt="a red fox",
modality=Modality.IMAGE,
)
.run(sink=storage)
)
print("Permanent URL:", result.run.steps[0].assets[0].url)Now:
Assets are uploaded to your bucket.
URLs do not expire.
A manifest is stored with the outputs.
Providers
A provider is how you choose which AI model runs a step. Use different providers depending on what you want to generate (image, video, audio) or which model you want to use.
Basic Usage
from genblaze_core import Pipeline, Modality
from genblaze_openai import DalleProvider, SoraProvider
# Image
Pipeline("img").step(
DalleProvider(),
model="dall-e-3",
prompt="a fox",
modality=Modality.IMAGE,
).run()
# Video
Pipeline("vid").step(
SoraProvider(),
model="sora-2",
prompt="a canyon at dawn",
modality=Modality.VIDEO,
).run()Fallback Models
If a model fails, you can automatically try another one. This improves reliability without adding manual retry logic.
The step records which model actually ran, so you can see if a fallback was used.
from genblaze_core import Pipeline, Modality
from genblaze_openai import SoraProvider
Pipeline("fallback").step(
SoraProvider(),
model="sora-2",
fallback_models=["sora-1"],
prompt="a canyon at sunrise",
modality=Modality.VIDEO,
).run()Pipelines
Use pipelines to define the steps in your workflow. A pipeline can run one step, chain steps together, run independent steps at the same time, or repeat the same step across multiple inputs.
Chaining Steps
Use chaining when the output from one step should become the input to the next step. In this example, the first step generates an image. The second step uses that image to generate a video.
from genblaze_core import Pipeline, Modality
from genblaze_openai import DalleProvider, SoraProvider
Pipeline("image-to-video", chain=True) \
.step(DalleProvider(), model="dall-e-3", prompt="a fox", modality=Modality.IMAGE) \
.step(SoraProvider(), model="sora-2", prompt="slow zoom", modality=Modality.VIDEO) \
.run()Parallel Execution
Use parallel execution when steps do not depend on each other. In this example, the three image prompts can run at the same time because none of them needs output from another step.
import asyncio
from genblaze_core import Pipeline, Modality
from genblaze_openai import DalleProvider
async def run():
result = await (
Pipeline("parallel")
.step(DalleProvider(), model="dall-e-3", prompt="a fox", modality=Modality.IMAGE)
.step(DalleProvider(), model="dall-e-3", prompt="a wolf", modality=Modality.IMAGE)
.step(DalleProvider(), model="dall-e-3", prompt="a bear", modality=Modality.IMAGE)
.arun(max_concurrency=3)
)
print(len(result.run.steps))
asyncio.run(run())Batch Runs
Use batch runs when you want to run the same pipeline for many inputs. In this example, the same image-generation step runs once for each prompt.
from genblaze_core import Pipeline, Modality
from genblaze_openai import DalleProvider
results = Pipeline("batch").step(
DalleProvider(),
model="dall-e-3",
prompt="{prompt}",
modality=Modality.IMAGE,
).batch_run(
prompts=[
"a fox on a beach",
"a fox in a forest",
"a fox in a city",
],
max_concurrency=3,
)
print(len(results))Local Development with Cache
Use caching during development to avoid re-running the same prompt. If the inputs have not changed, the cached result is returned instead of calling the provider again.
from genblaze_core import Pipeline, StepCache, Modality
from genblaze_openai import DalleProvider
Pipeline("dev") \
.cache(StepCache("cache/")) \
.step(DalleProvider(), model="dall-e-3", prompt="a fox", modality=Modality.IMAGE) \
.run()Storage
Use storage to save your outputs so they do not expire. Without storage, provider URLs are temporary. With storage, your assets are uploaded to your bucket and available long-term. This example connects your pipeline to a Backblaze B2 bucket:
Pass this storage object to pipeline.run(sink=storage) to store outputs.
from genblaze_core import ObjectStorageSink, KeyStrategy
from genblaze_s3 import S3StorageBackend
storage = ObjectStorageSink(
S3StorageBackend.for_backblaze("my-bucket"),
key_strategy=KeyStrategy.HIERARCHICAL,
)Key Strategies
Key strategies control how files are organized in your bucket.
HIERARCHICAL
Organizes outputs by date and run ID.
Use this when you want outputs grouped by when they were created.
storage = ObjectStorageSink(
S3StorageBackend.for_backblaze("my-bucket"),
key_strategy=KeyStrategy.HIERARCHICAL,
)CONTENT_ADDRESSABLE
Stores files based on their SHA-256 hash. Identical files are stored only once.
Use this when you want to avoid duplicate storage and reduce cost.
storage = ObjectStorageSink(
S3StorageBackend.for_backblaze("my-bucket"),
key_strategy=KeyStrategy.CONTENT_ADDRESSABLE,
)Manifests
Every pipeline run produces a manifest: a JSON record of what was generated and how.
A manifest includes:
the prompt
the model and provider
the parameters used
a SHA-256 hash of the output
This lets you verify that an asset has not changed and can be reproduced.
How a manifest gets attached to an asset
After a pipeline runs, genblaze embeds the manifest directly into the output file's metadata — an iTXt chunk for PNG, XMP for JPEG and WebP, a UUID box for MP4, ID3 tags for MP3, and so on. The file and its provenance record travel together as one artifact.
If the format doesn't support embedded metadata, genblaze falls back to a sidecar .json file written alongside the asset.
Verify in Python
You can access and verify the manifest directly from the pipeline result:
from genblaze_core import Pipeline, Modality
from genblaze_openai import DalleProvider
result = Pipeline("verify").step(
DalleProvider(),
model="dall-e-3",
prompt="a fox",
modality=Modality.IMAGE,
).run()
print(result.manifest.canonical_hash)
print(result.manifest.verify())canonical_hashis a fingerprint of the entire runverify()recomputes the hash and confirms integrity
Verify from the CLI
You can also verify assets later from the command line:
genblaze verify my-image.png
genblaze extract my-image.pngverifychecks that the asset matches its manifestextractprints the manifest JSON
Agent Loops
Use an agent loop when you want genblaze to keep trying until the output meets a quality bar. Each iteration runs a pipeline, evaluates the result, and can use that feedback to improve the next attempt.
Use this pattern when you need automated refinement, such as improving product images, generating variations until one passes review, or applying a scoring model before accepting an output.
from genblaze_core import (
Pipeline,
AgentLoop,
AgentContext,
CallableEvaluator,
EvaluationResult,
Modality,
)
from genblaze_openai import DalleProvider
def build_pipeline(ctx: AgentContext) -> Pipeline:
prompt = "a product shot on a white background, studio lighting"
if ctx.last_evaluation and ctx.last_evaluation.feedback:
prompt += f" — {ctx.last_evaluation.feedback}"
return (
Pipeline(f"iter-{ctx.iteration}")
.step(
DalleProvider(),
model="dall-e-3",
prompt=prompt,
modality=Modality.IMAGE,
)
)
def evaluate(result) -> EvaluationResult:
image_url = result.run.steps[0].assets[0].url
# Replace this with your own scorer — a vision model call, a classifier,
# or any function that returns a score between 0.0 and 1.0.
score = my_quality_scorer(image_url)
return EvaluationResult(
passed=score >= 0.8,
score=score,
feedback="increase contrast and add more shadow detail" if score < 0.8 else None,
)
loop = AgentLoop(
build_pipeline,
CallableEvaluator(evaluate),
max_iterations=4,
)
out = loop.run()
print(f"Passed: {out.passed}")
print(f"Iterations: {len(out.iterations)}")
print(f"Total cost: ${out.total_cost_usd:.2f}")Streaming Progress
Some pipelines, especially video generation, can take several minutes to complete. Instead of waiting silently for a result, streaming lets you receive updates as each step starts, progresses, and finishes. Use this when you want to display progress in a UI, log updates during execution, or monitor long-running jobs without blocking until completion.
from genblaze_core import Pipeline, Modality
from genblaze_openai import SoraProvider
pipe = (
Pipeline("stream-demo")
.step(
SoraProvider(),
model="sora-2",
prompt="a canyon at dawn, slow aerial",
modality=Modality.VIDEO,
expected_duration_sec=120,
)
)
for event in pipe.stream():
match event.type:
case "step.started":
print(f"Started: {event.provider}/{event.model}")
case "step.progress":
if not event.is_heartbeat:
pct = f"{event.progress_pct:.0%}" if event.progress_pct else "..."
print(f"Progress: {pct}")
case "step.completed":
print(f"Done in {event.elapsed_sec:.1f}s")
case "pipeline.completed":
print(f"Hash: {event.result.manifest.canonical_hash[:12]}")
case "step.failed":
print(f"Failed: {event.error}")Retry Policy
genblaze automatically retries temporary failures such as timeouts, rate limits, and server errors.
Use retry policies to control how aggressive those retries are, based on cost and reliability.
Conservative (2 attempts, 2s base backoff, 60s cap)
Use for expensive providers like video generation, where a duplicate submission costs real money.Aggressive (7 attempts, 0.5s base backoff, 15s cap)
Use for cheap or idempotent calls where transient failures are common but retrying is low-cost.Disabled (1 attempt, no retries)
Use in tests so failures surface immediately with the real error.
from genblaze_core.providers import RetryPolicy
from genblaze_openai import DalleProvider, SoraProvider
# Conservative: fewer retries, longer backoff.
sora = SoraProvider(retry_policy=RetryPolicy.conservative())
# Aggressive: more retries, shorter backoff.
dalle = DalleProvider(retry_policy=RetryPolicy.aggressive())
# Disabled: fail immediately.
test_provider = DalleProvider(retry_policy=RetryPolicy.disabled())Observability
Use observability to understand what your pipeline is doing while it runs. genblaze can emit structured logs and tracing data so you can debug issues, monitor performance, and track runs in production.
Use logging for simple visibility and debugging, and OpenTelemetry when you need distributed tracing or integration with monitoring tools.
Structured Logging
Use structured logging to emit JSON logs for each pipeline run and step. This is useful for debugging locally or sending logs to a logging system.
from genblaze_core import Pipeline, LoggingTracer
from genblaze_openai import DalleProvider
from genblaze_core import Modality
result = (
Pipeline("traced", tracer=LoggingTracer())
.step(
DalleProvider(),
model="dall-e-3",
prompt="a fox",
modality=Modality.IMAGE,
)
.run()
)OpenTelemetry
Use OpenTelemetry to send traces to observability platforms (such as Datadog, Honeycomb, or Jaeger). This lets you track pipeline runs and step performance across your system.
from genblaze_core import Pipeline, OTelTracer, Modality
from genblaze_openai import DalleProvider
tracer = OTelTracer(tracer_name="genblaze")
Pipeline("otel-demo", tracer=tracer).step(
DalleProvider(),
model="dall-e-3",
prompt="a fox",
modality=Modality.IMAGE,
).run()Troubleshooting
Import Errors
Make sure you installed both genblaze-core and the provider package (for example, genblaze-openai).
Authentication Errors
Check that your API key is set in the current shell before running your script.
Expired URLs
Provider URLs are temporary. Add storage to make outputs permanent.
ffmpeg Missing
Some features (like video/audio muxing) require ffmpeg.
Check if it is installed:
ffmpeg -versionIf this fails, install ffmpeg and try again.
Duplicate Jobs
Retries may trigger the same request more than once.
Use a conservative retry policy for expensive providers (like video generation).
Production Checklist
Use durable storage.
Pick a key strategy.
Set timeouts.
Tune retries.
Enable tracing.
Store manifest hashes.
Use Object Lock if needed.