ServicesAI Services

Provider & Model Registration

Provider is an abstract base class for AI upstreams. Subclass it to declare actions.

Provider is an abstract base class that encapsulates an upstream AI provider's actions and connection info. Use .model() to produce a ModelConfig, then pass it to AIService.use().

Quick Start: OpenAI-compatible Text Provider

Extend Provider and override createClient to get default text / stream actions:

import { createOpenAICompatible } from "@ai-sdk/openai-compatible";
import { Provider, type OpenAICompatibleClientConfig } from "@downcity/city";

class DeepSeekProvider extends Provider {
  constructor() {
    super({
      id: "deepseek",
      envKey: "DEEPSEEK_API_KEY",
      baseURL: "https://api.deepseek.com/v1",
      passthroughModel: "deepseek-chat",
    });
  }

  protected createClient({ apiKey, baseURL }: OpenAICompatibleClientConfig) {
    return createOpenAICompatible({ apiKey, baseURL, name: "deepseek" });
  }
}

const deepseek = new DeepSeekProvider();

const ai = new AIService();
ai.use(deepseek.model({ id: "deepseek-v4-flash", name: "DeepSeek V4 Flash" }));
base.use(ai);

This registers the routes automatically:

POST /v1/ai/text              → SDK path
POST /v1/ai/stream            → SDK path
POST /v1/ai/chat/completions  → auto-passthrough to DeepSeek
GET  /v1/ai/models            → model catalog

passthroughModel is the upstream model id used during auto-passthrough.

Overriding text / stream

If the default OpenAI-compatible implementation is not enough, override text / stream directly:

import { Provider, type Context, type AIProviderChargedOutput } from "@downcity/city";
import type { UIMessage } from "ai";

class CustomTextProvider extends Provider {
  constructor() {
    super({ id: "custom", envKey: "CUSTOM_API_KEY" });
  }

  async text(ctx: Context): Promise<AIProviderChargedOutput<UIMessage>> {
    // custom implementation
  }

  async stream(ctx: Context) {
    // custom implementation
  }
}

Multimodal prompt extraction

Override extractPrompt to flatten multimodal messages into plain text:

class DeepSeekProvider extends Provider {
  // ...
  protected extractPrompt(input: { prompt?: string; messages?: Array<{ role: string; parts?: Array<{ type: string; text?: string }> }> }): string {
    if (typeof input.prompt === "string") return input.prompt;
    return input.messages
      ?.find((m) => m.role === "user")
      ?.parts?.map((part) => (part.type === "text" ? part.text : ""))
      .filter(Boolean)
      .join("\n") ?? "";
  }
}

Model fallback

You can attach a fallback model directly on a model config:

const kimi = kimiProvider.model({
  id: "kimi-k2.6",
  name: "Kimi K2.6",
});

const deepseek = deepseekProvider.model({
  id: "deepseek-v4-flash",
  name: "DeepSeek V4 Flash",
  fallback: [
    {
      match: (media) => media.media_type.startsWith("image/"),
      model: kimi,
    },
  ],
});

When a request contains file media input that the current model cannot handle, AIService evaluates fallback rules in array order for text, stream, and chat/completions. The first matching rule whose target model is available becomes the execution model.

For UIMessage requests, AIService only extracts media from { type: "file" } parts and passes media.media_type, media.filename, and media.url into match. Image, audio, video, PDF, and custom routing should all live in your match functions. For OpenAI-compatible requests, image_url / input_image parts are normalized as media.media_type === "image/*" and use the same rule list.

If no fallback is configured, or no rule matches, AIService keeps the original model and the provider decides how to handle the request.

Image Providers

@downcity/city does not ship concrete image provider factories. City provides the Provider abstraction, AIService registration, and the unified client.ai.image_create() / client.ai.image_result() job contract.

Image job storage is owned by AIService: it declares the built-in async_jobs table, and Federation creates that table automatically during service initialization. A Provider only creates upstream work, queries upstream status, and normalizes upstream responses into the fixed protocol.

Normalization means the Provider must not pass an upstream raw response to AIService and expect AIService to guess fields. Different upstreams may put status in status, data.status, or result.status; others may represent pending or success with status_code + err + data. The Provider must translate those shapes inside image_fetch() into the only result shape Downcity understands:

type AIImageProviderFetchResult = {
  job_id: string;
  status: "queued" | "running" | "succeeded" | "failed";
  result?: UIMessage;
  error?: string;
  message?: string;
  poll_after_ms?: number;
  metadata?: Record<string, unknown>;
};

For example, when an upstream returns result pending, the Provider should return status: "running", a clear message, the next poll_after_ms, and a raw response summary in metadata.raw or an equivalent field. When the upstream succeeds, the Provider must return status: "succeeded" and an AI SDK UIMessage with file parts that can be materialized. When the upstream fails, the Provider must return status: "failed" and a user-visible error.

In short: AIService owns job storage, Queue scheduling, state caching, and billing; the Provider owns upstream protocol parsing, pending/succeeded/failed mapping, usage extraction, and image result conversion.

AIService also provides a platform-level pending fallback for image jobs: when a job stays queued or running longer than image_max_pending_duration_ms, it is marked failed, and both error and message are set to upstream timeout. The default is 2 hours. Providers do not need to decide whether the Downcity job has timed out; while the upstream is still pending, keep returning queued or running.

Complete Example

import {
  AIService,
  Provider,
  type AIImageProviderCreateResult,
  type AIImageProviderFetchResult,
  type Context,
} from "@downcity/city";

class MyImageProvider extends Provider {
  constructor() {
    super({ id: "my-image", envKey: "MY_IMAGE_API_KEY" });
  }

  async image_create(ctx: Context): Promise<AIImageProviderCreateResult> {
    const input = ctx.input as { prompt?: string; size?: string };
    const api_key = ctx.env("MY_IMAGE_API_KEY");
    const response = await fetch("https://image.example.com/jobs", {
      method: "POST",
      headers: {
        authorization: `Bearer ${api_key}`,
        "content-type": "application/json",
      },
      body: JSON.stringify({
        prompt: input.prompt,
        size: input.size,
      }),
    });
    const data = await response.json() as { id?: string; error?: string };
    if (!response.ok || !data.id) {
      return {
        job_id: `img_${crypto.randomUUID()}`,
        status: "failed",
        error: data.error ?? response.statusText,
      };
    }
    return {
      job_id: `img_${crypto.randomUUID()}`,
      status: "running",
      message: "submitted",
      poll_after_ms: 3000,
      metadata: {
        upstream_job_id: data.id,
        upstream_model: "image-basic",
      },
    };
  }

  async image_fetch(ctx: Context): Promise<AIImageProviderFetchResult> {
    const image_job = ctx.locals.ai_image_job as {
      record: { job_id: string; user_id?: string | null; city_id?: string | null };
      state?: { upstream_job_id?: string; upstream_model?: string };
    };
    const upstream_job_id = image_job.state?.upstream_job_id;
    if (!upstream_job_id) {
      return {
        job_id: image_job.record.job_id,
        status: "failed",
        error: "Missing upstream image job id",
      };
    }

    const api_key = ctx.env("MY_IMAGE_API_KEY");
    const response = await fetch(`https://image.example.com/jobs/${upstream_job_id}`, {
      headers: {
        authorization: `Bearer ${api_key}`,
      },
    });
    const data = await response.json() as {
      status?: "queued" | "running" | "succeeded" | "failed";
      image_url?: string;
      error?: string;
      usage?: unknown;
    };

    if (!response.ok || data.status === "failed") {
      return {
        job_id: image_job.record.job_id,
        status: "failed",
        error: data.error ?? response.statusText,
        metadata: image_job.state,
      };
    }

    if (data.status === "queued" || data.status === "running" || !data.image_url) {
      return {
        job_id: image_job.record.job_id,
        status: data.status ?? "running",
        message: "generating",
        poll_after_ms: 3000,
        metadata: image_job.state,
      };
    }

    return {
      job_id: image_job.record.job_id,
      status: "succeeded",
      result: {
        id: `msg_${crypto.randomUUID()}`,
        role: "assistant",
        parts: [{
          type: "file",
          mediaType: "image/png",
          url: data.image_url,
        }],
      },
      metadata: {
        ...image_job.state,
        user_id: image_job.record.user_id,
        city_id: image_job.record.city_id,
        usage: data.usage,
      },
    };
  }
}

const ai = new AIService({ balance });
const image_provider = new MyImageProvider();

ai.use(image_provider.model({
  id: "image-basic",
  name: "Image Basic",
  default: ["image"],
  bill(ctx, output) {
    return {
      user_id: output.metadata?.user_id,
      amount_microcredits: 50_000,
      ref: `image:${output.job_id}`,
      note: "image generation",
      metadata: { usage: output.metadata?.usage },
    };
  },
}));

Queue is a Federation-level capability and does not need to be passed into AIService:

fed.queue.use(cloudflareQueue(env.DOWNCITY_QUEUE));
fed.use(ai);

When the Cloudflare Queue consumer receives a message, hand it back to Federation:

for (const message of batch.messages) {
  await fed.queue.call(message.body);
  message.ack();
}

The client stays provider-neutral:

const job = await client.ai.image_create({
  model: "image-basic",
  prompt: "A clean product photo of a ceramic mug",
  ratio: "1:1",
  quality: "standard",
  count: 1,
});

const current = await client.ai.image_result({ job_id: job.job_id });

If the task is still queued or running, the frontend should call image_result() again after poll_after_ms. image_result() only reads the async_jobs cache. The background Queue calls ai.image/fetch; AIService then calls the Provider's image_fetch() to fetch upstream state and store it back into async_jobs.

Provider Job Contract

Callers only use client.ai.image_create() and client.ai.image_result(). Provider implementations normally use two methods:

class ImageProvider extends Provider {
  async image_create(ctx: Context): Promise<AIImageProviderCreateResult> {
    // Create/start upstream work. AIService stores ctx.input and this metadata in async_jobs.
    return { job_id: "upstream_or_local_job_id", status: "running" };
  }

  async image_fetch(ctx: Context): Promise<AIImageProviderFetchResult> {
    // Read ctx.locals.ai_image_job, query upstream, and return the latest state or final result.
    return { job_id: String(ctx.input.job_id), status: "running", poll_after_ms: 2000 };
  }
}

image_fetch is required for image models. A model exposes the image capability only when it provides both image_create and image_fetch.

AIService owns job storage, Queue scheduling, state caching, final result persistence, and idempotent billing. Provider-specific code owns only upstream calls, upstream polling, usage parsing, and result conversion to AI SDK UIMessage file parts.

Auto Passthrough

When baseURL + envKey are present but no openai method is defined, AIService generates a passthrough action automatically:

  • The OpenAI-compatible body is forwarded as-is.
  • The upstream Response is returned as-is.
  • passthroughModel replaces body.model when set.
// No need to write this — AIService handles it
// async openai(ctx) { return fetch(`${baseURL}/chat/completions`, ...) }

Custom OpenAI-compatible Handler

Provide an openai method when the upstream needs request or response customization:

import {
  Provider,
  type AIProviderChargedResponse,
  type Context,
  normalizeOpenAICompatibleBody,
  readRequiredEnv,
  trimTrailingSlash,
} from "@downcity/city";

class OpenAICustomProvider extends Provider {
  constructor() {
    super({ id: "openai-custom", envKey: "OPENAI_API_KEY", baseURL: "https://api.openai.com/v1" });
  }

  async openai(ctx: Context): Promise<AIProviderChargedResponse> {
    const apiKey = readRequiredEnv(ctx, this.envKey ?? "");
    const body = normalizeOpenAICompatibleBody(
      ctx.input as Record<string, unknown>,
      this.passthroughModel ?? "",
    );
    const response = await fetch(`${trimTrailingSlash(this.baseURL ?? "")}/chat/completions`, {
      method: "POST",
      headers: { authorization: `Bearer ${apiKey}`, "content-type": "application/json" },
      body: JSON.stringify(body),
    });
    return { response };
  }
}

Provider Billing

Use bill(ctx, output) when a provider or a specific model needs to turn a finished AI call into a charge draft. bill does not deduct balance by itself. AIService reads the returned charge line and calls balance.charge() through the configured BalanceService bridge.

For image jobs, bill(ctx, output) is called only when background image/fetch first advances the job to status: "succeeded" and writes result_json. image_create() never bills. User image_result() only reads the async_jobs cache, does not call the Provider, and does not bill again. If Queue consumption runs without a user context, return user_id in the charge line from the stored job owner.

The usual priority is:

  1. an action explicitly returns charge
  2. the model provides bill(ctx, output)
  3. the provider provides bill(ctx, output)
  4. no charge is created
import { normalizeAIUsage } from "@downcity/city";

class DeepSeekProvider extends Provider {
  // ...
  protected bill(ctx: Context, output: unknown) {
    const usage = output?.metadata?.usage;
    const normalized = normalizeAIUsage(usage);
    // compute amount from usage
    return {
      amount_microcredits: 0,
      note: `DeepSeek ${ctx.variant?.id}`,
      metadata: { raw_usage: usage },
    };
  }
}

Model registration can override provider billing:

ai.use(deepseek.model({
  id: "deepseek-v4-pro",
  name: "DeepSeek V4 Pro",
  bill(ctx, output) {
    return {
      amount_microcredits: 20_000,
      note: "DeepSeek V4 Pro request",
      metadata: {
        model_id: ctx.variant?.id,
        usage: output?.metadata?.usage,
      },
    };
  },
}));

You can also return charge directly from an action method:

async text(ctx: Context) {
  const result = await callUpstream(ctx.input);
  return {
    output: toUIMessage(result),
    charge: {
      amount_microcredits: priceUpstreamUsage(result.usage),
      note: "custom text",
    },
  };
}

Provider Fields & Methods

FieldRequiredDescription
idYesProvider unique id
envNoEnv var declarations
baseURLRequired for passthroughUpstream API URL
envKeyRequired for passthroughEnv var name for API key
passthroughModelNoReplaces body.model during passthrough
MethodDescription
createClientOverride when text/stream is needed; returns an OpenAI-compatible client
textSDK text action (default from base class)
streamSDK stream action (default from base class)
image_createCreate and start an image provider job
image_fetchQuery image provider job state in the background and return the final result
videoSDK video action
openai/chat/completions action (auto-passthrough if omitted)
billReturn a charge draft for the finished call
extractPromptExtract prompt from input (overrideable)

Helper Functions

@downcity/city exports common helpers for Provider subclasses:

  • readRequiredEnv(ctx, key)
  • resolveUpstreamModel(ctx, fallback)
  • trimTrailingSlash(url)
  • buildAssistantMessage(text, ctx, result, charge?)
  • buildImageMessage(ctx, images, metadata)
  • normalizeOpenAICompatibleBody(input, model)
  • readOpenAICompatibleSseUsage(body)
  • normalizeAIUsage(usage)
  • buildToolSet(items)