Skip to content

Commit bd01824

Browse files
authored
fix(compute-engine/local): Honor field_mapping on join keys in dedup + join nodes (#6395)
* fix: Apply field mapping to join keys in local compute engine nodes When a batch source defines a `field_mapping` that renames an entity join key (e.g. `USERID` -> `user_id`), the source-read node renames the columns on the pulled Arrow table to their mapped names. Downstream `LocalDedupNode` and `LocalJoinNode` then look up the *pre-mapping* names from `column_info.join_keys`, which raises `KeyError: Index(['USERID'])` during materialization (or returns an empty join). Add a `join_keys_columns` property on `ColumnInfo` that mirrors the existing `timestamp_column` / `created_timestamp_column` properties — returning join keys translated through `field_mapping` — and use it from the dedup and join nodes. Fixes #5942. Signed-off-by: 1fanwang <1fannnw@gmail.com> * test: also cover LocalJoinNode field_mapping case Signed-off-by: 1fanwang <1fannnw@gmail.com> --------- Signed-off-by: 1fanwang <1fannnw@gmail.com>
1 parent e362173 commit bd01824

3 files changed

Lines changed: 138 additions & 4 deletions

File tree

sdk/python/feast/infra/compute_engines/dag/context.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,18 @@ def created_timestamp_column(self) -> Optional[str]:
4242
"""
4343
return self._get_mapped_column(self.created_ts_col)
4444

45+
@property
46+
def join_keys_columns(self) -> List[str]:
47+
"""
48+
Get the join keys, mapped through field_mapping to their post-rename
49+
column names. Use this when looking up columns on a DataFrame that has
50+
already had its source columns renamed (e.g. inside DAG nodes that
51+
consume the output of a source-read node).
52+
"""
53+
if not self.field_mapping:
54+
return list(self.join_keys)
55+
return [self.field_mapping.get(key, key) for key in self.join_keys]
56+
4557
def _get_mapped_column(self, column: Optional[str]) -> Optional[str]:
4658
"""
4759
Helper method to get the mapped column name if it exists in field_mapping.

sdk/python/feast/infra/compute_engines/local/nodes.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -79,14 +79,18 @@ def execute(self, context: ExecutionContext) -> ArrowTableValue:
7979
for val in input_values:
8080
val.assert_format(DAGFormat.ARROW)
8181

82+
# The upstream source-read node has already renamed columns via
83+
# field_mapping, so use the mapped join keys for joining (see #5942).
84+
join_keys = self.column_info.join_keys_columns
85+
8286
# Convert all upstream ArrowTables to backend DataFrames
8387
joined_df = self.backend.from_arrow(input_values[0].data)
8488
for val in input_values[1:]:
8589
next_df = self.backend.from_arrow(val.data)
8690
joined_df = self.backend.join(
8791
joined_df,
8892
next_df,
89-
on=self.column_info.join_keys,
93+
on=join_keys,
9094
how=self.how,
9195
)
9296

@@ -105,7 +109,7 @@ def execute(self, context: ExecutionContext) -> ArrowTableValue:
105109
joined_df = self.backend.join(
106110
entity_df,
107111
joined_df,
108-
on=self.column_info.join_keys,
112+
on=join_keys,
109113
how="left",
110114
)
111115

@@ -193,8 +197,10 @@ def execute(self, context: ExecutionContext) -> ArrowTableValue:
193197

194198
# Extract join_keys, timestamp, and created_ts from context
195199

196-
# Dedup strategy: sort and drop_duplicates
197-
dedup_keys = self.column_info.join_keys
200+
# Dedup strategy: sort and drop_duplicates. Use the mapped join key
201+
# names so we look up the columns that the source-read node has
202+
# already renamed (see issue #5942).
203+
dedup_keys = self.column_info.join_keys_columns
198204
if dedup_keys:
199205
sort_keys = [self.column_info.timestamp_column]
200206
if (

sdk/python/tests/unit/infra/compute_engines/local/test_nodes.py

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,122 @@ def test_local_dedup_node():
186186
assert set(df_result["entity_id"]) == {1, 2}
187187

188188

189+
def test_local_dedup_node_with_field_mapping_on_join_key():
190+
"""Regression test for materialization failure when a join key has a field mapping.
191+
192+
The source-read node renames columns via field_mapping (e.g. ``USERID`` -> ``user_id``)
193+
before passing the table to downstream nodes. Without mapping ``column_info.join_keys``
194+
the dedup node would look up the pre-mapping name and raise ``KeyError(['USERID'])``.
195+
196+
See https://github.com/feast-dev/feast/issues/5942.
197+
"""
198+
# Simulate a source-read node output: columns already renamed to the mapped names.
199+
df = pd.DataFrame(
200+
{
201+
"user_id": [1, 1, 2],
202+
"value": [100, 200, 300],
203+
"event_timestamp": [
204+
now - timedelta(seconds=1),
205+
now,
206+
now,
207+
],
208+
}
209+
)
210+
211+
context = create_context(
212+
node_outputs={"source": ArrowTableValue(pa.Table.from_pandas(df))}
213+
)
214+
215+
node = LocalDedupNode(
216+
name="dedup",
217+
backend=backend,
218+
column_info=ColumnInfo(
219+
# The raw join key matches the source column name; field_mapping maps
220+
# it to the user-facing name that the source-read node has already
221+
# renamed the column to.
222+
join_keys=["USERID"],
223+
feature_cols=["value"],
224+
ts_col="EVENT_TIMESTAMP",
225+
created_ts_col=None,
226+
field_mapping={"USERID": "user_id", "EVENT_TIMESTAMP": "event_timestamp"},
227+
),
228+
)
229+
node.add_input(MagicMock())
230+
node.inputs[0].name = "source"
231+
232+
result = node.execute(context)
233+
234+
df_result = result.data.to_pandas()
235+
assert df_result.shape[0] == 2
236+
assert set(df_result["user_id"]) == {1, 2}
237+
238+
239+
def test_local_join_node_with_field_mapping_on_join_key():
240+
"""Regression test for materialization failure when a join key has a field mapping.
241+
242+
The source-read node renames columns via field_mapping (e.g. ``USERID`` -> ``user_id``)
243+
before passing the table to downstream nodes. Without mapping ``column_info.join_keys``
244+
the join node would call ``backend.join(..., on=["USERID"], ...)`` and raise
245+
``KeyError(['USERID'])`` because the columns have already been renamed.
246+
247+
See https://github.com/feast-dev/feast/issues/5942.
248+
"""
249+
# Simulate two source-read node outputs: columns already renamed to the mapped names.
250+
left_df = pd.DataFrame(
251+
{
252+
"user_id": [1, 2],
253+
"value": [10, 20],
254+
"event_timestamp": [now, now],
255+
}
256+
)
257+
right_df = pd.DataFrame(
258+
{
259+
"user_id": [1, 2],
260+
"other_value": [100, 200],
261+
"event_timestamp": [now, now],
262+
}
263+
)
264+
265+
context = create_context(
266+
node_outputs={
267+
"left": ArrowTableValue(pa.Table.from_pandas(left_df)),
268+
"right": ArrowTableValue(pa.Table.from_pandas(right_df)),
269+
}
270+
)
271+
# Bypass the trailing entity_df join — this test exercises the input-table
272+
# join path that consumed the raw (unmapped) join keys before the fix.
273+
context.entity_df = None
274+
275+
join_node = LocalJoinNode(
276+
name="join",
277+
backend=backend,
278+
column_info=ColumnInfo(
279+
# Raw join key matches the source column name; field_mapping maps it
280+
# to the user-facing name that the source-read node has already
281+
# renamed the column to.
282+
join_keys=["USERID"],
283+
feature_cols=["value", "other_value"],
284+
ts_col="EVENT_TIMESTAMP",
285+
created_ts_col=None,
286+
field_mapping={"USERID": "user_id", "EVENT_TIMESTAMP": "event_timestamp"},
287+
),
288+
)
289+
left_input = MagicMock()
290+
left_input.name = "left"
291+
right_input = MagicMock()
292+
right_input.name = "right"
293+
join_node.add_input(left_input)
294+
join_node.add_input(right_input)
295+
296+
result = join_node.execute(context)
297+
298+
df_result = result.data.to_pandas()
299+
assert df_result.shape[0] == 2
300+
assert set(df_result["user_id"]) == {1, 2}
301+
assert "value" in df_result.columns
302+
assert "other_value" in df_result.columns
303+
304+
189305
def test_local_transformation_node():
190306
context = create_context(
191307
node_outputs={"source": ArrowTableValue(pa.Table.from_pandas(sample_df))}

0 commit comments

Comments
 (0)