2025, Sep 23 09:00

Resolving PySpark Structured Streaming state deserialization failures: replace StructType value state with separate list states

Fix PySpark Streaming StatefulProcessor state deserialization in transformWithStateInPandas by replacing a StructType value state with two list states.

When building a real-time fraud detection prototype on PySpark Structured Streaming with StatefulProcessor and transformWithStateInPandas, a subtle state serialization issue can surface right after the first micro-batch. Everything initializes fine, the first batch is processed, but the second batch crashes with a PythonException raised during state retrieval, pointing to a PySparkRuntimeError and an inability to introspect a JavaBean with the message “wrong number of arguments”. The failure occurs on reading back a value state that holds arrays, which makes the pipeline break mid-stream.

Reproducible example

The following program demonstrates the issue. It stores two arrays in a value state and updates them per key as new micro-batches arrive. The overall streaming logic is intact; only names are different for clarity.

from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, unix_timestamp
from pyspark.sql.types import (
    StructType,
    StructField,
    StringType,
    DoubleType,
    TimestampType,
    ArrayType,
    LongType,
)
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle

import pandas as pd


class AccumProc(StatefulProcessor):
    def init(self, ctrl: StatefulProcessorHandle):
        self._bundle = ctrl.getValueState(
            "bundle",
            StructType([
                StructField("places", ArrayType(StringType())),
                StructField("ticks", ArrayType(LongType())),
            ])
        )

    def handleInputRows(self, k, batches, timers):
        if not self._bundle.exists():
            snapshot = {"places": [], "ticks": []}
        else:
            snapshot = {"places": self._bundle.get()[0], "ticks": self._bundle.get()[1]}

        inc_places = []
        inc_ticks = []
        for frame in batches:
            inc_places.extend(frame["location"].tolist())
            inc_ticks.extend(frame["unix_timestamp"].tolist())

        snapshot["places"].extend(inc_places)
        snapshot["ticks"].extend(inc_ticks)

        self._bundle.update((snapshot["places"], snapshot["ticks"]))

        yield pd.DataFrame()


def run():
    sess = (
        SparkSession.builder.appName("RealTimeFraudDetector")
        .config(
            "spark.sql.streaming.stateStore.providerClass",
            "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider",
        )
        .getOrCreate()
    )
    sess.sparkContext.setLogLevel("WARN")

    tx_schema = StructType(
        [
            StructField("transaction_id", StringType(), True),
            StructField("card_number", StringType(), True),
            StructField("card_holder", StringType(), True),
            StructField("amount", DoubleType(), True),
            StructField("currency", StringType(), True),
            StructField("location", StringType(), True),
            StructField("timestamp", TimestampType(), True),
        ]
    )

    out_schema = StructType(
        [
            StructField("card_number", StringType(), True),
            StructField("is_fraud", StringType(), True),
            StructField("message", StringType(), True),
        ]
    )

    kafka_stream = (
        sess.readStream.format("kafka")
        .option("kafka.bootstrap.servers", "broker:29092")
        .option("subscribe", "transaction")
        .load()
    )

    tx_df = (
        kafka_stream.select(from_json(col("value").cast("string"), tx_schema).alias("payload"))
        .select("payload.*")
        .withColumn("unix_timestamp", unix_timestamp(col("timestamp")))
    )

    enriched_df = (
        tx_df.withWatermark("timestamp", "10 minutes")
        .groupBy("card_number")
        .transformWithStateInPandas(
            AccumProc(), outputStructType=out_schema, outputMode="append", timeMode="None"
        )
    )

    query = enriched_df.writeStream.outputMode("append").format("console").start()

    query.awaitTermination()


if __name__ == "__main__":
    run()

What is actually going wrong

The failure happens on the second micro-batch when reading back the previously saved value state. The stack trace shows the exception originates from the get call on the state handle and bubbles up through Arrow serialization, ending with “couldn’t introspect javabean: java.lang.IllegalArgumentException: wrong number of arguments”. In practice, the symptom is that a StructType-based value state that carries arrays of primitive types is written but cannot be deserialized on the next access in this setup.

The practical fix

A reliable way to avoid the deserialization error is to split the composite value state into two list states. Each list state has a single-field schema and stores one of the collections separately. With that change, saving and loading state becomes stable.

from pyspark.sql.types import StructType, StructField, StringType, LongType
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle


class AccumProc(StatefulProcessor):
    def init(self, ctrl: StatefulProcessorHandle):
        ts_list_schema = StructType([StructField("ts", LongType(), True)])
        loc_list_schema = StructType([StructField("loc", StringType(), True)])
        self._ts_state = ctrl.getListState(stateName="tsState", schema=ts_list_schema)
        self._loc_state = ctrl.getListState(stateName="locState", schema=loc_list_schema)

With separate list states, the processor can persist and retrieve arrays of timestamps and locations without hitting Arrow deserialization issues.

Why this detail matters

State management is the backbone of any streaming fraud detector, especially when correlating events across micro-batches. If the state store cannot reliably round-trip your data, the stream fails mid-flight and you lose the continuity you rely on for anomaly detection logic. Keeping state schemas simple helps you sidestep serialization landmines and makes your pipeline resilient in production. Also, independent of this specific error, the DataFrame you yield from the processor should align with the outputStructType you declare; mismatches there can cause their own set of failures.

Takeaways

If you hit a “couldn’t introspect javabean” error while retrieving a value state in a PySpark StatefulProcessor, consider storing arrays as two list states with single-field schemas instead of a single StructType value state. This change lets the state serialize and deserialize cleanly through Arrow. Keep the streaming state minimal and well-typed, and double-check that your emitted DataFrames match the declared output schema to keep structured streaming happy.

The article is based on a question from StackOverflow by Marco Filippozzi and an answer by Marco Filippozzi.