Skip to content

fix: unify schema across batches in JSONStreamDatasource to handle null → concrete type evolution#972

Open
fengrui-z wants to merge 5 commits intodatajuicer:mainfrom
fengrui-z:fix/json-stream-schema-lock
Open

fix: unify schema across batches in JSONStreamDatasource to handle null → concrete type evolution#972
fengrui-z wants to merge 5 commits intodatajuicer:mainfrom
fengrui-z:fix/json-stream-schema-lock

Conversation

@fengrui-z
Copy link
Copy Markdown
Collaborator

@fengrui-z fengrui-z commented Apr 29, 2026

Summary

Fixes #936

JSONStreamDatasource._read_stream locks the schema from the first batch and reuses it for all subsequent batches. When an
early batch infers a nested field as null (e.g. meta.url = null) and a later batch introduces a concrete type (e.g.
string), the forced cast from string to null fails with ArrowInvalid.

This is a correctness bug in DJ's custom JSON streaming ingestion path. Ray's native ray.data.read_json handles the same input correctly.

Root Cause

# Before: first batch locks schema, all subsequent batches forced to it
table = pyarrow.Table.from_batches([batch], schema=schema)
if schema is None:
    schema = table.schema  # locked forever

Fix

  1. Remove the first-batch schema lock — create table without forced schema
  2. Use pyarrow.unify_schemas to merge schemas across batches, allowing null → concrete type promotion
  3. After unification, cast the batch to the unified schema for consistency
  # After: schema evolves across batches
  table = pyarrow.Table.from_batches([batch])
  if schema is None:
      schema = table.schema
  else:
      unified = pyarrow.unify_schemas([schema, table.schema])
      if not unified.equals(schema):
          schema = unified
      table = pyarrow.Table.from_batches([batch], schema=schema)

unify_schemas internally delegates to Arrow C++ UnifyTypes, which promotes null to the concrete type and recursively handles nested structs.

Test Plan

See #936 for the minimal reproduction script.

…ll → concrete type evolution

The previous implementation locked the schema from the first batch and
reused it for all subsequent batches via `Table.from_batches([batch],
schema=schema)`. When an early batch inferred a nested field as `null`
(e.g. `meta.url = null`) and a later batch introduced a concrete type
(e.g. `string`), the cast from `string` to `null` would fail with
ArrowInvalid.

This fix removes the first-batch schema lock and instead uses
`pyarrow.unify_schemas` to merge schemas across batches, allowing
`null` types to be promoted to concrete types as new data is read.

Fixes datajuicer#936

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request updates the _read_stream method in ray_dataset.py to support schema unification when reading batches from a stream. This allows the system to handle batches with varying but compatible schemas. A review comment suggests refactoring the implementation to reduce code duplication by consolidating the pyarrow.Table creation after the final schema has been determined.

Comment thread data_juicer/core/data/ray_dataset.py Outdated
@fengrui-z fengrui-z marked this pull request as ready for review April 29, 2026 09:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug] JSONStreamDatasource locks first-batch schema and fails on later null -> concrete type evolution

1 participant