genblaze Developer Guide
    • Dark
      Light

    genblaze Developer Guide

    • Dark
      Light

    Article summary

    genblaze is a Python SDK for building AI media pipelines. You define what to generate, which provider runs it, and where outputs go. genblaze handles execution, retries, storage, and produces a verifiable provenance record for every run.

    The genblaze SDK is open source. You can find the full source code, examples, and issue tracker on GitHub.

    This guide takes you from your first pipeline to running genblaze in real applications.

    Use genblaze if you want to:

    • Build repeatable AI workflows, not one-off API calls.

    • Chain models, such as image → video → audio.

    • Store outputs permanently.

    • Verify how an asset was generated.

    Overview

    genblaze is built around four core pieces:

    • Providers call AI models (OpenAI, Runway, Replicate, and others) and let you switch models without changing your code.

    • Pipelines define what happens (a single step or multiple steps chained together).

    • Storage saves outputs so they do not expire.

    • Manifests record what was generated and let you verify it later.

    Getting Started

    If you are new to genblaze:

    1. Install the SDK.

    2. Run your first pipeline locally.

    3. Add storage to make outputs permanent.

    Building Workflows

    As you build:

    • Use Providers to choose models.

    • Use Pipelines to chain steps together.

    • Use Storage to manage outputs and cost.

    • Use Manifests to verify results.

    Going to Production

    For production:

    • Use Streaming to track long-running jobs.

    • Tune Retry policy for cost and reliability.

    • Add Observability for logging and tracing.

    Prerequisites

    • Python 3.11+

    • At least one provider API key

    Optional:

    Install

    Run these commands in your terminal:

    pip install genblaze-core genblaze-openai genblaze-s3
    • genblaze-core Core SDK

    • genblaze-openai OpenAI provider (DALL·E, Sora, etc.)

    • genblaze-s3 Storage backend (used for Backblaze B2 and other S3-compatible storage)

    Optional providers:

    Install additional providers as needed. Install only the providers you plan to use.

    pip install genblaze-replicate
    pip install genblaze-runway
    pip install genblaze-google
    pip install genblaze-luma

    Set an API Key

    Run the following command in your terminal:

    export OPENAI_API_KEY="your-api-key"

    Run a Pipeline

    Copy the code below into a file (for example, main.py) and run it with:

    python filename.py
    from genblaze_core import Pipeline, Modality
    from genblaze_openai import DalleProvider
    
    pipeline = Pipeline("my-first-pipeline")
    
    result = pipeline.step(
        DalleProvider(output_dir="output/"),
        model="dall-e-3",
        prompt="a red fox sitting in a snowy forest, golden hour",
        modality=Modality.IMAGE,
    ).run()
    
    asset = result.run.steps[0].assets[0]
    
    print("Saved to:", asset.url)
    print("SHA256:", asset.sha256)

    Expected:

    • An image saved to output/

    • A file URL, such as file://...

    • A SHA-256 hash

    If this works, your setup is correct.

    Add Durable Storage

    Provider URLs expire. Storage makes them permanent.

    Set your Backblaze B2 Credentials

    Run the following command in your terminal:

    export B2_KEY_ID="your-key-id"
    export B2_APP_KEY="your-app-key"

    Update the Pipeline to Use Storage

    Copy the code below into your Python file (for example, main.py), replacing your previous pipeline:

    from genblaze_core import Pipeline, Modality, ObjectStorageSink, KeyStrategy
    from genblaze_openai import DalleProvider
    from genblaze_s3 import S3StorageBackend
    
    storage = ObjectStorageSink(
        S3StorageBackend.for_backblaze("my-bucket"),
        key_strategy=KeyStrategy.HIERARCHICAL,
    )
    
    result = (
        Pipeline("stored")
        .step(
            DalleProvider(),
            model="dall-e-3",
            prompt="a red fox",
            modality=Modality.IMAGE,
        )
        .run(sink=storage)
    )
    
    print("Permanent URL:", result.run.steps[0].assets[0].url)

    Now:

    • Assets are uploaded to your bucket.

    • URLs do not expire.

    • A manifest is stored with the outputs.

    Providers

    A provider is how you choose which AI model runs a step. Use different providers depending on what you want to generate (image, video, audio) or which model you want to use.

    Basic Usage

    from genblaze_core import Pipeline, Modality
    from genblaze_openai import DalleProvider, SoraProvider
    
    # Image
    Pipeline("img").step(
        DalleProvider(),
        model="dall-e-3",
        prompt="a fox",
        modality=Modality.IMAGE,
    ).run()
    
    # Video
    Pipeline("vid").step(
        SoraProvider(),
        model="sora-2",
        prompt="a canyon at dawn",
        modality=Modality.VIDEO,
    ).run()

    Fallback Models

    If a model fails, you can automatically try another one. This improves reliability without adding manual retry logic.
    The step records which model actually ran, so you can see if a fallback was used.

    from genblaze_core import Pipeline, Modality
    from genblaze_openai import SoraProvider
    
    Pipeline("fallback").step(
        SoraProvider(),
        model="sora-2",
        fallback_models=["sora-1"],
        prompt="a canyon at sunrise",
        modality=Modality.VIDEO,
    ).run()

    Pipelines

    Use pipelines to define the steps in your workflow. A pipeline can run one step, chain steps together, run independent steps at the same time, or repeat the same step across multiple inputs.

    Chaining Steps

    Use chaining when the output from one step should become the input to the next step. In this example, the first step generates an image. The second step uses that image to generate a video.

    from genblaze_core import Pipeline, Modality
    from genblaze_openai import DalleProvider, SoraProvider
    
    Pipeline("image-to-video", chain=True) \
        .step(DalleProvider(), model="dall-e-3", prompt="a fox", modality=Modality.IMAGE) \
        .step(SoraProvider(), model="sora-2", prompt="slow zoom", modality=Modality.VIDEO) \
        .run()

    Parallel Execution

    Use parallel execution when steps do not depend on each other. In this example, the three image prompts can run at the same time because none of them needs output from another step.

    import asyncio
    from genblaze_core import Pipeline, Modality
    from genblaze_openai import DalleProvider
    
    async def run():
        result = await (
            Pipeline("parallel")
            .step(DalleProvider(), model="dall-e-3", prompt="a fox", modality=Modality.IMAGE)
            .step(DalleProvider(), model="dall-e-3", prompt="a wolf", modality=Modality.IMAGE)
            .step(DalleProvider(), model="dall-e-3", prompt="a bear", modality=Modality.IMAGE)
            .arun(max_concurrency=3)
        )
        print(len(result.run.steps))
    
    asyncio.run(run())

    Batch Runs

    Use batch runs when you want to run the same pipeline for many inputs. In this example, the same image-generation step runs once for each prompt.

    from genblaze_core import Pipeline, Modality
    from genblaze_openai import DalleProvider
    
    results = Pipeline("batch").step(
        DalleProvider(),
        model="dall-e-3",
        prompt="{prompt}",
        modality=Modality.IMAGE,
    ).batch_run(
        prompts=[
            "a fox on a beach",
            "a fox in a forest",
            "a fox in a city",
        ],
        max_concurrency=3,
    )
    
    print(len(results))

    Local Development with Cache

    Use caching during development to avoid re-running the same prompt. If the inputs have not changed, the cached result is returned instead of calling the provider again.

    from genblaze_core import Pipeline, StepCache, Modality
    from genblaze_openai import DalleProvider
    
    Pipeline("dev") \
        .cache(StepCache("cache/")) \
        .step(DalleProvider(), model="dall-e-3", prompt="a fox", modality=Modality.IMAGE) \
        .run()

    Storage

    Use storage to save your outputs so they do not expire. Without storage, provider URLs are temporary. With storage, your assets are uploaded to your bucket and available long-term. This example connects your pipeline to a Backblaze B2 bucket:

    Pass this storage object to pipeline.run(sink=storage) to store outputs.

    from genblaze_core import ObjectStorageSink, KeyStrategy
    from genblaze_s3 import S3StorageBackend
    
    storage = ObjectStorageSink(
        S3StorageBackend.for_backblaze("my-bucket"),
        key_strategy=KeyStrategy.HIERARCHICAL,
    )

    Key Strategies

    Key strategies control how files are organized in your bucket.

    • HIERARCHICAL

      • Organizes outputs by date and run ID.

      • Use this when you want outputs grouped by when they were created.

    storage = ObjectStorageSink(
        S3StorageBackend.for_backblaze("my-bucket"),
        key_strategy=KeyStrategy.HIERARCHICAL,
    )
    • CONTENT_ADDRESSABLE

      • Stores files based on their SHA-256 hash. Identical files are stored only once.

      • Use this when you want to avoid duplicate storage and reduce cost.

    storage = ObjectStorageSink(
        S3StorageBackend.for_backblaze("my-bucket"),
        key_strategy=KeyStrategy.CONTENT_ADDRESSABLE,
    )

    Manifests

    Every pipeline run produces a manifest: a JSON record of what was generated and how.

    A manifest includes:

    • the prompt

    • the model and provider

    • the parameters used

    • a SHA-256 hash of the output

    This lets you verify that an asset has not changed and can be reproduced.

    How a manifest gets attached to an asset

    After a pipeline runs, genblaze embeds the manifest directly into the output file's metadata — an iTXt chunk for PNG, XMP for JPEG and WebP, a UUID box for MP4, ID3 tags for MP3, and so on. The file and its provenance record travel together as one artifact.

    If the format doesn't support embedded metadata, genblaze falls back to a sidecar .json file written alongside the asset.

    Verify in Python

    You can access and verify the manifest directly from the pipeline result:

    from genblaze_core import Pipeline, Modality
    from genblaze_openai import DalleProvider
    
    result = Pipeline("verify").step(
        DalleProvider(),
        model="dall-e-3",
        prompt="a fox",
        modality=Modality.IMAGE,
    ).run()
    
    print(result.manifest.canonical_hash)
    print(result.manifest.verify())
    • canonical_hash is a fingerprint of the entire run

    • verify() recomputes the hash and confirms integrity

    Verify from the CLI

    You can also verify assets later from the command line:

    genblaze verify my-image.png
    genblaze extract my-image.png
    • verify checks that the asset matches its manifest

    • extract prints the manifest JSON

    Agent Loops

    Use an agent loop when you want genblaze to keep trying until the output meets a quality bar. Each iteration runs a pipeline, evaluates the result, and can use that feedback to improve the next attempt.

    Use this pattern when you need automated refinement, such as improving product images, generating variations until one passes review, or applying a scoring model before accepting an output.

    from genblaze_core import (
        Pipeline,
        AgentLoop,
        AgentContext,
        CallableEvaluator,
        EvaluationResult,
        Modality,
    )
    from genblaze_openai import DalleProvider
    
    def build_pipeline(ctx: AgentContext) -> Pipeline:
        prompt = "a product shot on a white background, studio lighting"
        if ctx.last_evaluation and ctx.last_evaluation.feedback:
            prompt += f" — {ctx.last_evaluation.feedback}"
        return (
            Pipeline(f"iter-{ctx.iteration}")
            .step(
                DalleProvider(),
                model="dall-e-3",
                prompt=prompt,
                modality=Modality.IMAGE,
            )
        )
    
    def evaluate(result) -> EvaluationResult:
        image_url = result.run.steps[0].assets[0].url
        # Replace this with your own scorer — a vision model call, a classifier,
        # or any function that returns a score between 0.0 and 1.0.
        score = my_quality_scorer(image_url)
        return EvaluationResult(
            passed=score >= 0.8,
            score=score,
            feedback="increase contrast and add more shadow detail" if score < 0.8 else None,
        )
    
    loop = AgentLoop(
        build_pipeline,
        CallableEvaluator(evaluate),
        max_iterations=4,
    )
    out = loop.run()
    
    print(f"Passed: {out.passed}")
    print(f"Iterations: {len(out.iterations)}")
    print(f"Total cost: ${out.total_cost_usd:.2f}")

    Streaming Progress

    Some pipelines, especially video generation, can take several minutes to complete. Instead of waiting silently for a result, streaming lets you receive updates as each step starts, progresses, and finishes. Use this when you want to display progress in a UI, log updates during execution, or monitor long-running jobs without blocking until completion.

    from genblaze_core import Pipeline, Modality
    from genblaze_openai import SoraProvider
    
    pipe = (
        Pipeline("stream-demo")
        .step(
            SoraProvider(),
            model="sora-2",
            prompt="a canyon at dawn, slow aerial",
            modality=Modality.VIDEO,
            expected_duration_sec=120,
        )
    )
    
    for event in pipe.stream():
        match event.type:
            case "step.started":
                print(f"Started: {event.provider}/{event.model}")
    
            case "step.progress":
                if not event.is_heartbeat:
                    pct = f"{event.progress_pct:.0%}" if event.progress_pct else "..."
                    print(f"Progress: {pct}")
    
            case "step.completed":
                print(f"Done in {event.elapsed_sec:.1f}s")
    
            case "pipeline.completed":
                print(f"Hash: {event.result.manifest.canonical_hash[:12]}")
    
            case "step.failed":
                print(f"Failed: {event.error}")

    Retry Policy

    genblaze automatically retries temporary failures such as timeouts, rate limits, and server errors.

    Use retry policies to control how aggressive those retries are, based on cost and reliability.

    • Conservative (2 attempts, 2s base backoff, 60s cap)
      Use for expensive providers like video generation, where a duplicate submission costs real money.

    • Aggressive (7 attempts, 0.5s base backoff, 15s cap)
      Use for cheap or idempotent calls where transient failures are common but retrying is low-cost.

    • Disabled (1 attempt, no retries)
      Use in tests so failures surface immediately with the real error.

    from genblaze_core.providers import RetryPolicy
    from genblaze_openai import DalleProvider, SoraProvider
    
    # Conservative: fewer retries, longer backoff.
    sora = SoraProvider(retry_policy=RetryPolicy.conservative())
    
    # Aggressive: more retries, shorter backoff.
    dalle = DalleProvider(retry_policy=RetryPolicy.aggressive())
    
    # Disabled: fail immediately.
    test_provider = DalleProvider(retry_policy=RetryPolicy.disabled())

    Observability

    Use observability to understand what your pipeline is doing while it runs. genblaze can emit structured logs and tracing data so you can debug issues, monitor performance, and track runs in production.

    Use logging for simple visibility and debugging, and OpenTelemetry when you need distributed tracing or integration with monitoring tools.

    Structured Logging

    Use structured logging to emit JSON logs for each pipeline run and step. This is useful for debugging locally or sending logs to a logging system.

    from genblaze_core import Pipeline, LoggingTracer
    from genblaze_openai import DalleProvider
    from genblaze_core import Modality
    
    result = (
        Pipeline("traced", tracer=LoggingTracer())
        .step(
            DalleProvider(),
            model="dall-e-3",
            prompt="a fox",
            modality=Modality.IMAGE,
        )
        .run()
    )

    OpenTelemetry

    Use OpenTelemetry to send traces to observability platforms (such as Datadog, Honeycomb, or Jaeger). This lets you track pipeline runs and step performance across your system.

    from genblaze_core import Pipeline, OTelTracer, Modality
    from genblaze_openai import DalleProvider
    
    tracer = OTelTracer(tracer_name="genblaze")
    
    Pipeline("otel-demo", tracer=tracer).step(
        DalleProvider(),
        model="dall-e-3",
        prompt="a fox",
        modality=Modality.IMAGE,
    ).run()

    Troubleshooting

    Import Errors

    Make sure you installed both genblaze-core and the provider package (for example, genblaze-openai).

    Authentication Errors

    Check that your API key is set in the current shell before running your script.

    Expired URLs

    Provider URLs are temporary. Add storage to make outputs permanent.

    ffmpeg Missing

    Some features (like video/audio muxing) require ffmpeg.

    Check if it is installed:

    ffmpeg -version

    If this fails, install ffmpeg and try again.

    Duplicate Jobs

    Retries may trigger the same request more than once.

    Use a conservative retry policy for expensive providers (like video generation).

    Production Checklist

    • Use durable storage.

    • Pick a key strategy.

    • Set timeouts.

    • Tune retries.

    • Enable tracing.

    • Store manifest hashes.

    • Use Object Lock if needed.


    Was this article helpful?