2025, Dec 28 09:00

How to Publish Avro Schema Versions to Apicurio Schema Registry 3.0.x Without 400 Errors

Learn why publishing a new Avro schema version to Apicurio Schema Registry 3.0.x returns 400, and fix it by sending a JSON content wrapper for the versions API.

Publishing a new version of an Avro schema to Apicurio Schema Registry 3.0.7 can fail in a non-obvious way if the request body isn’t shaped exactly as the API expects. The first version registers fine, but the second version responds with a 400 and a hint that a required parameter is missing. Below is a compact walkthrough of the problem, the exact reason behind it, and the fix that works reliably with the 3.0.x API.

Reproducing the issue

The flow is straightforward: obtain a Keycloak token, check if the artifact exists, then either create the artifact or upload a new version. The following code demonstrates the failing approach when posting a new version.

import requests
import json
import urllib.parse
# Configuration
OIDC_REALM       = "https://keycloak.vkp.versa-vani.com/realms/readonly-realm"
SR_CLIENT_ID     = "apicurio-registry"
SR_CLIENT_SECRET = "<secret>"
REG_API          = "https://apicurio-sr.vkp.versa-vani.com/apis/registry/v3"
SCHEMA_GROUP     = "default"
SCHEMA_ID        = "com.versa.apicurio.confluent.Employee"
def issue_service_token():
    token_endpoint = f"{OIDC_REALM}/protocol/openid-connect/token"
    payload = {"grant_type": "client_credentials"}
    r = requests.post(token_endpoint, data=payload, auth=(SR_CLIENT_ID, SR_CLIENT_SECRET))
    r.raise_for_status()
    return r.json()["access_token"]
def register_or_update_schema(bearer_token):
    check_url = f"{REG_API}/groups/{SCHEMA_GROUP}/artifacts/{SCHEMA_ID}"
    base_headers = {"Authorization": f"Bearer {bearer_token}"}
    check = requests.get(check_url, headers=base_headers)
    print("check.status_code:", check.status_code)
    schema_doc = {
        "type": "record",
        "name": "Employee",
        "namespace": "com.versa.apicurio.confluent",
        "fields": [
            {"name": "id",       "type": "int"},
            {"name": "name",     "type": "string"},
            {"name": "salary",   "type": ["null", "float"],  "default": None},
            {"name": "age",      "type": ["null", "int"],    "default": None},
            {
                "name": "department",
                "type": {
                    "type": "enum",
                    "name": "DepartmentEnum",
                    "symbols": ["HR", "ENGINEERING", "SALES"]
                }
            },
            {"name": "email",    "type": ["null", "string"], "default": None},
            {"name": "new_col",  "type": ["null", "string"], "default": None},
            {"name": "new_col2", "type": ["null", "string"], "default": None},
            {"name": "new_col3", "type": ["null", "string"], "default": None},
        ]
    }
    payload_str = json.dumps(schema_doc)
    if check.status_code == 200:
        print("Artifact exists, uploading a new version...")
        publish_url = f"{REG_API}/groups/{SCHEMA_GROUP}/artifacts/{SCHEMA_ID}/versions"
        headers_publish = {
            "Authorization": f"Bearer {bearer_token}",
            "Content-Type": "application/json"
        }
        # Failing call: raw schema sent as request body
        resp = requests.post(publish_url, headers=headers_publish, data=payload_str)
        print("Publish response:", resp.status_code, resp.text)
    elif check.status_code == 404:
        print("Artifact not found, creating new artifact...")
        create_url = f"{REG_API}/groups/{SCHEMA_GROUP}/artifacts"
        headers_create = {
            "Authorization": f"Bearer {bearer_token}",
            "Content-Type": "application/json",
            "X-Registry-ArtifactId": SCHEMA_ID,
            "X-Registry-ArtifactType": "AVRO"
        }
        resp = requests.post(create_url, headers=headers_create, data=payload_str)
    else:
        print(f"Unexpected error while checking artifact: {check.status_code} - {check.text}")
        check.raise_for_status()
        return
    print("Publish response:", resp.status_code, resp.text)
    if resp.ok:
        print("Schema published successfully!")
        if "globalId" in resp.json():
            print("Registered globalId:", resp.json().get("globalId"))
    else:
        print("Error publishing schema:", resp.status_code, resp.text)
    resp.raise_for_status()
def enumerate_artifacts_and_versions(bearer_token, artifact_id):
    list_url = f"{REG_API}/groups/{SCHEMA_GROUP}/artifacts"
    headers = {"Authorization": f"Bearer {bearer_token}", "Accept": "application/json"}
    r = requests.get(list_url, headers=headers)
    print("Get schemas response:", r.status_code)
    r.raise_for_status()
    print("Raw artifacts response:", json.dumps(r.json(), indent=2))
    artifacts_payload = r.json()
    print("type(r.json()) ->", type(r.json()))
    print(f"artifacts_payload: {artifacts_payload}")
    if isinstance(artifacts_payload, dict):
        artifacts = artifacts_payload.get("artifacts", [])
    elif isinstance(artifacts_payload, list):
        artifacts = artifacts_payload
    else:
        print("Unexpected response format for artifacts list!")
        return
    print(f"Found {len(artifacts)} artifacts in group `{SCHEMA_GROUP}`:")
    for item in artifacts:
        print(f"  - Artifact: {item}")
        if isinstance(item, dict):
            artifact_id_resp = item.get("artifactId")
            created_by = item.get("createdBy", "<unknown>")
        else:
            artifact_id_resp = item
            created_by = "<unknown>"
        print(f"artifactId: {artifact_id_resp}, created_by: {created_by}")
        if not artifact_id_resp:
            print(f"  - Skipping artifact with missing ID: {item}")
            continue
        if artifact_id != artifact_id_resp:
            continue
        print(f"  - Artifact ID: {artifact_id_resp}, createdBy: {created_by}")
        versions_url = (
            f"{REG_API}/groups/{SCHEMA_GROUP}/artifacts/"
            f"{urllib.parse.quote(str(artifact_id_resp), safe='')}/versions"
        )
        vr = requests.get(versions_url, headers=headers)
        print("Get versions response:", vr.status_code)
        if vr.status_code == 200:
            versions_data = vr.json().get("versions", [])
            print(f"Found {len(versions_data)} versions for artifact `{artifact_id_resp}`:")
            print(f"  - Versions data: {versions_data}")
            version_ids = [v.get("version") for v in versions_data]
            print(f"Versions: {version_ids}")
            for ver in version_ids:
                version_meta_url = (
                    f"{REG_API}/groups/{SCHEMA_GROUP}/artifacts/"
                    f"{urllib.parse.quote(str(artifact_id_resp), safe='')}/versions/{ver}"
                )
                meta = requests.get(version_meta_url, headers=headers)
                print("Get version metadata response:", meta.status_code)
                if meta.status_code == 200:
                    meta_json = meta.json()
                    global_id = meta_json.get("globalId", "<not found>")
                    print(f"      - Version: {ver}, globalId: {global_id}")
                    content_url = (
                        f"{REG_API}/groups/{SCHEMA_GROUP}/artifacts/"
                        f"{urllib.parse.quote(str(artifact_id_resp), safe='')}/versions/{ver}/content"
                    )
                    content_resp = requests.get(content_url, headers=headers)
                    if content_resp.status_code == 200:
                        print(f"        Schema content for version {ver}:\n{content_resp.text}")
                    else:
                        print(
                            f"        Failed to get schema content for version {ver}: "
                            f"{content_resp.status_code} - {content_resp.text}"
                        )
                else:
                    print(
                        f"      - Failed to get version {ver} metadata: "
                        f"{meta.status_code} - {meta.text}"
                    )
        else:
            print(f"    Failed to get versions: {vr.status_code} - {vr.text}")
    return artifacts
if __name__ == "__main__":
    tok = issue_service_token()
    print("Token acquired, length:", len(tok))
    print("token:", tok)
    register_or_update_schema(tok)
    print("listing - enumerate_artifacts_and_versions")
    enumerate_artifacts_and_versions(tok, SCHEMA_ID)

What’s actually going wrong

The first publish succeeds because creating an artifact accepts the schema as the raw request body with headers that specify the artifact metadata. The second publish targets the versions endpoint, which expects a different request shape. Sending the schema directly as the body causes the server to complain that the required parameter isn’t present.

MissingRequiredParameterException: Request is missing a required parameter: content

In other words, the versions endpoint requires a JSON envelope that includes a content field, not just the raw schema JSON.

The fix: send the nested content structure

When posting a new version in 3.0.x, wrap the schema string into the structure expected by the endpoint. Below is the corrected implementation of the version branch.

def register_or_update_schema(bearer_token):
    check_url = f"{REG_API}/groups/{SCHEMA_GROUP}/artifacts/{SCHEMA_ID}"
    base_headers = {"Authorization": f"Bearer {bearer_token}"}
    check = requests.get(check_url, headers=base_headers)
    print("check.status_code:", check.status_code)
    schema_doc = {
        "type": "record",
        "name": "Employee",
        "namespace": "com.versa.apicurio.confluent",
        "fields": [
            {"name": "id",       "type": "int"},
            {"name": "name",     "type": "string"},
            {"name": "salary",   "type": ["null", "float"],  "default": None},
            {"name": "age",      "type": ["null", "int"],    "default": None},
            {
                "name": "department",
                "type": {
                    "type": "enum",
                    "name": "DepartmentEnum",
                    "symbols": ["HR", "ENGINEERING", "SALES"]
                }
            },
            {"name": "email",    "type": ["null", "string"], "default": None},
            {"name": "new_col",  "type": ["null", "string"], "default": None},
            {"name": "new_col2", "type": ["null", "string"], "default": None},
            {"name": "new_col3", "type": ["null", "string"], "default": None},
        ]
    }
    payload_str = json.dumps(schema_doc)
    if check.status_code == 200:
        print("Artifact exists, uploading a new version...")
        publish_url = f"{REG_API}/groups/{SCHEMA_GROUP}/artifacts/{SCHEMA_ID}/versions"
        # The versions endpoint expects a nested content structure
        version_payload = {
            "content": {
                "content": payload_str,
                "contentType": "application/json"
            }
        }
        resp = requests.post(
            publish_url,
            headers={"Authorization": f"Bearer {bearer_token}", "Content-Type": "application/json"},
            json=version_payload
        )
    elif check.status_code == 404:
        print("Artifact not found, creating new artifact...")
        create_url = f"{REG_API}/groups/{SCHEMA_GROUP}/artifacts"
        headers_create = {
            "Authorization": f"Bearer {bearer_token}",
            "Content-Type": "application/json",
            "X-Registry-ArtifactId": SCHEMA_ID,
            "X-Registry-ArtifactType": "AVRO"
        }
        resp = requests.post(create_url, headers=headers_create, data=payload_str)
    else:
        print(f"Unexpected error while checking artifact: {check.status_code} - {check.text}")
        check.raise_for_status()
        return
    print("Publish response:", resp.status_code, resp.text)
    if resp.ok:
        print("Schema published successfully!")
        if "globalId" in resp.json():
            print("Registered globalId:", resp.json().get("globalId"))
    else:
        print("Error publishing schema:", resp.status_code, resp.text)
    resp.raise_for_status()

With this body shape, the versions endpoint receives the content parameter it requires and accepts the new version.

Why this matters

Artifact creation and version publication are not symmetric in this API. Treating both the same leads to a 400 response that can be puzzling at first glance, especially since the raw schema body works during the initial publish. Knowing the exact payload format for the versions endpoint prevents failed deployments and avoids manual retries that don’t address the underlying mismatch.

Conclusion

If the first publish succeeds but the second fails with MissingRequiredParameterException: content, the request body for the versions endpoint is the culprit. Keep artifact creation as-is, but for new versions, post a JSON object with a content wrapper that includes the schema string and contentType. This small change aligns the request with what Apicurio Registry 3.0.x expects and makes version publishing predictable.