feat(iceberg): single JVM per process instead of per stream-chunk#962
Open
vikaxsh wants to merge 33 commits into
Open
feat(iceberg): single JVM per process instead of per stream-chunk#962vikaxsh wants to merge 33 commits into
vikaxsh wants to merge 33 commits into
Conversation
… into feat/unified-jvm
…se using a cancelled flag
…eat/unified-jvm
… into feat/unified-jvm
hash-data
reviewed
Jun 24, 2026
| // (namespace, upsert, partition spec, identifier-field) was already captured by | ||
| // the JVM on the GET_OR_CREATE_TABLE payload during Setup, so RECORDS / COMMIT | ||
| // payloads carry only the thread_id the JVM routes on (callers add schema/payload). | ||
| func (w *LegacyWriter) newMetadata() *proto.IcebergPayload_Metadata { |
| case <-done: | ||
| case <-ctx.Done(): | ||
| logger.Warnf("Context cancelled, killing Iceberg JVM") | ||
| _ = s.cmd.Process.Kill() |
Collaborator
There was a problem hiding this comment.
check once again this logic
Collaborator
There was a problem hiding this comment.
this comment is for myself
| err := i.server.closeIcebergClient() | ||
| if err != nil { | ||
| logger.Errorf("Thread[%s]: error closing Iceberg client: %s", i.options.ThreadID, err) | ||
| if cleanupErr := i.writer.Cleanup(); cleanupErr != nil { |
Collaborator
There was a problem hiding this comment.
do we still need separate this?
| private static final String FILE_TYPE_EQUALITY_DELETE = "equalityDelete"; | ||
| private static final String FILE_TYPE_POSITIONAL_DELETE = "positionalDelete"; | ||
|
|
||
| private final Catalog icebergCatalog; |
Collaborator
There was a problem hiding this comment.
can we remove unenecessary variables
| import java.util.ArrayList; | ||
| import java.util.Arrays; | ||
| import java.util.List; | ||
| import java.util.function.BooleanSupplier; |
| session.icebergTable.refresh(); | ||
| String commitState = session.op.getCommitState(session.icebergTable); | ||
| sendResponse(responseObserver, session.icebergTable.schema().toString(), | ||
| commitState != null ? commitState : ""); |
Collaborator
There was a problem hiding this comment.
dont we send some text if there is not state?
| // DROP_TABLE carries "db.table" in destTableName and must NOT create | ||
| // a per-thread session (computeIfAbsent would load/create the very | ||
| // table we're about to drop). Handle it before session setup. | ||
| if (request.getType() == IcebergPayload.PayloadType.DROP_TABLE) { |
Collaborator
There was a problem hiding this comment.
why we need it a separate?
| } | ||
| } No newline at end of file | ||
|
|
||
| private static List<Map<String, String>> toPartitionList(List<IcebergPayload.PartitionField> protos) { |
Collaborator
There was a problem hiding this comment.
any reason for addition?
| // CLOSE_SESSION: just drop the session. No closeQuietly() — it would | ||
| // clear op.filesToCommit while an in-flight REGISTER_AND_COMMIT may be | ||
| // using that list. Nothing to close here; the session is GC'd. | ||
| if (request.getType() == ArrowPayload.PayloadType.CLOSE_SESSION) { |
Collaborator
There was a problem hiding this comment.
we dont need at multiple place right
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Description
This PR refactors the Iceberg writer architecture to use a single shared JVM per OLake CLI invocation instead of creating one JVM per writer thread/chunk.
Previously, every Iceberg writer initialization spawned a dedicated JVM-backed Iceberg client and gRPC server, resulting in significant memory overhead under concurrent backfill and CDC workloads. Large syncs could create many JVM processes, leading to excessive memory consumption and potential OOM issues.
Changes
Introduced a shared JVM architecture where all streams and chunks communicate with a single JVM instance.
Added
ThreadSession-based isolation to maintain per-stream/per-chunk state within the shared JVM.Moved stream-specific configuration from JVM startup arguments to gRPC request metadata:
Added
StreamMetaCtxon the Go side to propagate stream-specific metadata with every request.Added
ThreadSessionmanagement in Java to maintain isolated:Added
CLOSE_SESSIONRPC operation for explicit session cleanup and resource release.Retained catalog initialization and other truly global resources at the JVM level.
Benefits
Fixes # (issue)
Type of change
How Has This Been Tested?
CLOSE_SESSION.Screenshots or Recordings
N/A
Documentation
Related PR's (If Any):
N/A