From 2ca9b8631669959a244de6429f7893d573859175 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Tue, 3 Mar 2026 17:04:42 -0800 Subject: [PATCH] runtime, graph: fix VID collisions in ipfs.map() callbacks Each ipfs.map() callback created a fresh BlockState with vid_seq reset to RESERVED_VIDS. When multiple callbacks write to the same entity table, they all generate vids starting at (block<<32)+100, producing duplicates. Fix by threading vid_seq through the callback loop and updating it in EntityCache::extend() so the parent handler also continues from the right sequence after merging callback results. Addresses same underlying issue as PR https://github.com/graphprotocol/graph-node/pull/6336 --- graph/src/components/store/entity_cache.rs | 3 +++ runtime/wasm/src/host_exports.rs | 12 ++++++++++-- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/graph/src/components/store/entity_cache.rs b/graph/src/components/store/entity_cache.rs index 8e6a16bc7a1..ca5493d78e7 100644 --- a/graph/src/components/store/entity_cache.rs +++ b/graph/src/components/store/entity_cache.rs @@ -457,6 +457,9 @@ impl EntityCache { for (key, op) in other.updates { self.entity_op(key, op); } + // Carry forward vid_seq to prevent VID collisions when the caller + // continues writing entities after merging. + self.vid_seq = self.vid_seq.max(other.vid_seq); } /// Generate an id. diff --git a/runtime/wasm/src/host_exports.rs b/runtime/wasm/src/host_exports.rs index 0d9c0d2b3bc..d72dd21e99c 100644 --- a/runtime/wasm/src/host_exports.rs +++ b/runtime/wasm/src/host_exports.rs @@ -537,7 +537,7 @@ impl HostExports { let host_metrics = wasm_ctx.host_metrics.clone(); let valid_module = wasm_ctx.valid_module.clone(); - let ctx = wasm_ctx.ctx.derive_with_empty_block_state(); + let mut ctx = wasm_ctx.ctx.derive_with_empty_block_state(); let callback = callback.to_owned(); // Create a base error message to avoid borrowing headaches let errmsg = format!( @@ -560,9 +560,14 @@ impl HostExports { let mut v = Vec::new(); while let Some(sv) = stream.next().await { let sv = sv?; + let mut derived = ctx.derive_with_empty_block_state(); + // Continue the vid sequence from the previous callback so + // each iteration doesn't reset to RESERVED_VIDS and + // produce duplicate VIDs. + derived.state.entity_cache.vid_seq = ctx.state.entity_cache.vid_seq; let module = WasmInstance::from_valid_module_with_ctx_boxed( valid_module.clone(), - ctx.derive_with_empty_block_state(), + derived, host_metrics.clone(), wasm_ctx.experimental_features, ) @@ -570,6 +575,9 @@ impl HostExports { let result = module .handle_json_callback(&callback, &sv.value, &user_data) .await?; + // Carry forward vid_seq so the next iteration continues + // the sequence. + ctx.state.entity_cache.vid_seq = result.entity_cache.vid_seq; // Log progress every 15s if last_log.elapsed() > Duration::from_secs(15) { debug!(