Skip to content

feat(iceberg): single JVM per process instead of per stream-chunk#962

Open
vikaxsh wants to merge 33 commits into
stagingfrom
feat/unified-jvm
Open

feat(iceberg): single JVM per process instead of per stream-chunk#962
vikaxsh wants to merge 33 commits into
stagingfrom
feat/unified-jvm

Conversation

@vikaxsh

@vikaxsh vikaxsh commented May 26, 2026

Copy link
Copy Markdown
Collaborator

Description

This PR refactors the Iceberg writer architecture to use a single shared JVM per OLake CLI invocation instead of creating one JVM per writer thread/chunk.

Previously, every Iceberg writer initialization spawned a dedicated JVM-backed Iceberg client and gRPC server, resulting in significant memory overhead under concurrent backfill and CDC workloads. Large syncs could create many JVM processes, leading to excessive memory consumption and potential OOM issues.

Changes

  • Introduced a shared JVM architecture where all streams and chunks communicate with a single JVM instance.

  • Added ThreadSession-based isolation to maintain per-stream/per-chunk state within the shared JVM.

  • Moved stream-specific configuration from JVM startup arguments to gRPC request metadata:

    • Namespace
    • Upsert mode
    • Identifier field creation
    • Iceberg partition transforms
  • Added StreamMetaCtx on the Go side to propagate stream-specific metadata with every request.

  • Added ThreadSession management in Java to maintain isolated:

    • Iceberg table handles
    • Table operators
    • Writers
    • Commit state
  • Added CLOSE_SESSION RPC operation for explicit session cleanup and resource release.

  • Retained catalog initialization and other truly global resources at the JVM level.

Benefits

  • JVM heap is allocated only once per OLake run.
  • Significantly reduces memory consumption during concurrent syncs.
  • Eliminates excessive JVM process creation.
  • Preserves stream-level isolation through session-based state management.
  • Improves scalability for parallel backfill and CDC workloads.

Fixes # (issue)

Type of change

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • This change requires a documentation update

How Has This Been Tested?

  • Validated concurrent backfill execution using multiple chunks sharing a single JVM instance.
  • Validated CDC streams operating concurrently through the shared JVM.
  • Verified session creation and cleanup via CLOSE_SESSION.
  • Verified successful table creation, schema evolution, writes, and commits across multiple sessions.
  • Verified memory utilization remains stable compared to the previous multi-JVM architecture.

Screenshots or Recordings

N/A

Documentation

  • Documentation Link: [link to README, olake.io/docs, or olake-docs]
  • N/A (bug fix, refactor, or test changes only)

Related PR's (If Any):

N/A

@vikaxsh vikaxsh marked this pull request as ready for review May 26, 2026 10:49
@vikaxsh vikaxsh marked this pull request as draft May 26, 2026 10:52
@vikaxsh vikaxsh requested a deployment to integration_tests June 2, 2026 05:49 — with GitHub Actions Waiting
@vikaxsh vikaxsh requested a deployment to integration_tests June 2, 2026 05:49 — with GitHub Actions Waiting
@vikaxsh vikaxsh marked this pull request as ready for review June 2, 2026 05:50
@vikaxsh vikaxsh requested a deployment to integration_tests June 2, 2026 05:50 — with GitHub Actions Waiting
@vikaxsh vikaxsh requested a deployment to integration_tests June 2, 2026 05:50 — with GitHub Actions Waiting
@vikaxsh vikaxsh temporarily deployed to integration_tests June 2, 2026 05:57 — with GitHub Actions Inactive
@vikaxsh vikaxsh temporarily deployed to integration_tests June 2, 2026 05:57 — with GitHub Actions Inactive
@vikaxsh vikaxsh requested a deployment to integration_tests June 2, 2026 08:01 — with GitHub Actions Waiting
@vikaxsh vikaxsh requested a deployment to integration_tests June 2, 2026 08:01 — with GitHub Actions Waiting
@vikaxsh vikaxsh requested a deployment to integration_tests June 3, 2026 05:46 — with GitHub Actions Waiting
@vikaxsh vikaxsh requested a deployment to integration_tests June 3, 2026 05:46 — with GitHub Actions Waiting
@vikaxsh vikaxsh requested a deployment to integration_tests June 3, 2026 06:58 — with GitHub Actions Waiting
@vikaxsh vikaxsh requested a deployment to integration_tests June 3, 2026 06:58 — with GitHub Actions Waiting
@vikaxsh vikaxsh requested a deployment to integration_tests June 3, 2026 07:40 — with GitHub Actions Waiting
@vikaxsh vikaxsh requested a deployment to integration_tests June 3, 2026 07:40 — with GitHub Actions Waiting
@vikaxsh vikaxsh requested a deployment to integration_tests June 17, 2026 09:54 — with GitHub Actions Waiting
@vikaxsh vikaxsh requested a deployment to integration_tests June 17, 2026 10:04 — with GitHub Actions Waiting
@vikaxsh vikaxsh requested a deployment to integration_tests June 17, 2026 10:04 — with GitHub Actions Waiting
@vikaxsh vikaxsh requested a deployment to integration_tests June 17, 2026 10:36 — with GitHub Actions Waiting
@vikaxsh vikaxsh requested a deployment to integration_tests June 17, 2026 10:36 — with GitHub Actions Waiting
@vikaxsh vikaxsh requested a deployment to integration_tests June 23, 2026 06:36 — with GitHub Actions Waiting
@vikaxsh vikaxsh requested a deployment to integration_tests June 23, 2026 06:36 — with GitHub Actions Waiting
@vikaxsh vikaxsh requested a deployment to integration_tests June 23, 2026 06:54 — with GitHub Actions Waiting
@vikaxsh vikaxsh requested a deployment to integration_tests June 23, 2026 06:54 — with GitHub Actions Waiting
@vikaxsh vikaxsh requested a deployment to integration_tests June 23, 2026 10:00 — with GitHub Actions Waiting
@vikaxsh vikaxsh requested a deployment to integration_tests June 23, 2026 10:00 — with GitHub Actions Waiting
@vikaxsh vikaxsh requested a deployment to integration_tests June 23, 2026 10:43 — with GitHub Actions Waiting
@vikaxsh vikaxsh requested a deployment to integration_tests June 23, 2026 10:43 — with GitHub Actions Waiting
@vikaxsh vikaxsh requested a deployment to integration_tests June 23, 2026 10:53 — with GitHub Actions Waiting
@vikaxsh vikaxsh requested a deployment to integration_tests June 23, 2026 10:53 — with GitHub Actions Waiting
@vikaxsh vikaxsh requested a deployment to integration_tests June 23, 2026 11:45 — with GitHub Actions Waiting
@vikaxsh vikaxsh requested a deployment to integration_tests June 23, 2026 11:45 — with GitHub Actions Waiting
// (namespace, upsert, partition spec, identifier-field) was already captured by
// the JVM on the GET_OR_CREATE_TABLE payload during Setup, so RECORDS / COMMIT
// payloads carry only the thread_id the JVM routes on (callers add schema/payload).
func (w *LegacyWriter) newMetadata() *proto.IcebergPayload_Metadata {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can remove this

case <-done:
case <-ctx.Done():
logger.Warnf("Context cancelled, killing Iceberg JVM")
_ = s.cmd.Process.Kill()

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check once again this logic

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment is for myself

err := i.server.closeIcebergClient()
if err != nil {
logger.Errorf("Thread[%s]: error closing Iceberg client: %s", i.options.ThreadID, err)
if cleanupErr := i.writer.Cleanup(); cleanupErr != nil {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we still need separate this?

private static final String FILE_TYPE_EQUALITY_DELETE = "equalityDelete";
private static final String FILE_TYPE_POSITIONAL_DELETE = "positionalDelete";

private final Catalog icebergCatalog;

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we remove unenecessary variables

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.function.BooleanSupplier;

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not required

session.icebergTable.refresh();
String commitState = session.op.getCommitState(session.icebergTable);
sendResponse(responseObserver, session.icebergTable.schema().toString(),
commitState != null ? commitState : "");

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dont we send some text if there is not state?

// DROP_TABLE carries "db.table" in destTableName and must NOT create
// a per-thread session (computeIfAbsent would load/create the very
// table we're about to drop). Handle it before session setup.
if (request.getType() == IcebergPayload.PayloadType.DROP_TABLE) {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we need it a separate?

}
} No newline at end of file

private static List<Map<String, String>> toPartitionList(List<IcebergPayload.PartitionField> protos) {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this we not had prev?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason for addition?

// CLOSE_SESSION: just drop the session. No closeQuietly() — it would
// clear op.filesToCommit while an in-flight REGISTER_AND_COMMIT may be
// using that list. Nothing to close here; the session is GC'd.
if (request.getType() == ArrowPayload.PayloadType.CLOSE_SESSION) {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we dont need at multiple place right

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants