Skip to main content

hydro_lang/compile/ir/
mod.rs

1use core::panic;
2use std::cell::RefCell;
3use std::collections::HashMap;
4#[cfg(feature = "build")]
5use std::collections::HashSet;
6use std::fmt::{Debug, Display};
7use std::hash::{Hash, Hasher};
8use std::ops::Deref;
9use std::rc::Rc;
10
11#[cfg(feature = "build")]
12use dfir_lang::graph::FlatGraphBuilder;
13#[cfg(feature = "build")]
14use proc_macro2::Span;
15use proc_macro2::TokenStream;
16use quote::ToTokens;
17#[cfg(feature = "build")]
18use quote::quote;
19#[cfg(feature = "build")]
20use slotmap::{SecondaryMap, SparseSecondaryMap};
21#[cfg(feature = "build")]
22use syn::parse_quote;
23use syn::visit::{self, Visit};
24use syn::visit_mut::VisitMut;
25
26#[cfg(feature = "build")]
27use crate::compile::builder::ClockId;
28use crate::compile::builder::{CycleId, ExternalPortId};
29#[cfg(feature = "build")]
30use crate::compile::deploy_provider::{Deploy, Node, RegisterPort};
31use crate::location::dynamic::LocationId;
32use crate::location::{LocationKey, NetworkHint};
33
34pub mod backtrace;
35use backtrace::Backtrace;
36
37/// Wrapper that displays only the tokens of a parsed expr.
38///
39/// Boxes `syn::Type` which is ~240 bytes.
40#[derive(Clone, Hash)]
41pub struct DebugExpr(pub Box<syn::Expr>);
42
43impl serde::Serialize for DebugExpr {
44    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
45        serializer.serialize_str(&self.to_string())
46    }
47}
48
49impl From<syn::Expr> for DebugExpr {
50    fn from(expr: syn::Expr) -> Self {
51        Self(Box::new(expr))
52    }
53}
54
55impl Deref for DebugExpr {
56    type Target = syn::Expr;
57
58    fn deref(&self) -> &Self::Target {
59        &self.0
60    }
61}
62
63impl ToTokens for DebugExpr {
64    fn to_tokens(&self, tokens: &mut TokenStream) {
65        self.0.to_tokens(tokens);
66    }
67}
68
69impl Debug for DebugExpr {
70    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71        write!(f, "{}", self.0.to_token_stream())
72    }
73}
74
75impl Display for DebugExpr {
76    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77        let original = self.0.as_ref().clone();
78        let simplified = simplify_q_macro(original);
79
80        // For now, just use quote formatting without trying to parse as a statement
81        // This avoids the syn::parse_quote! issues entirely
82        write!(f, "q!({})", quote::quote!(#simplified))
83    }
84}
85
86/// Simplify expanded q! macro calls back to q!(...) syntax for better readability
87fn simplify_q_macro(mut expr: syn::Expr) -> syn::Expr {
88    // Try to parse the token string as a syn::Expr
89    // Use a visitor to simplify q! macro expansions
90    let mut simplifier = QMacroSimplifier::new();
91    simplifier.visit_expr_mut(&mut expr);
92
93    // If we found and simplified a q! macro, return the simplified version
94    if let Some(simplified) = simplifier.simplified_result {
95        simplified
96    } else {
97        expr
98    }
99}
100
101/// AST visitor that simplifies q! macro expansions
102#[derive(Default)]
103pub struct QMacroSimplifier {
104    pub simplified_result: Option<syn::Expr>,
105}
106
107impl QMacroSimplifier {
108    pub fn new() -> Self {
109        Self::default()
110    }
111}
112
113impl VisitMut for QMacroSimplifier {
114    fn visit_expr_mut(&mut self, expr: &mut syn::Expr) {
115        // Check if we already found a result to avoid further processing
116        if self.simplified_result.is_some() {
117            return;
118        }
119
120        if let syn::Expr::Call(call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
121            // Look for calls to stageleft::runtime_support::fn*
122            && self.is_stageleft_runtime_support_call(&path_expr.path)
123            // Try to extract the closure from the arguments
124            && let Some(closure) = self.extract_closure_from_args(&call.args)
125        {
126            self.simplified_result = Some(closure);
127            return;
128        }
129
130        // Continue visiting child expressions using the default implementation
131        // Use the default visitor to avoid infinite recursion
132        syn::visit_mut::visit_expr_mut(self, expr);
133    }
134}
135
136impl QMacroSimplifier {
137    fn is_stageleft_runtime_support_call(&self, path: &syn::Path) -> bool {
138        // Check if this is a call to stageleft::runtime_support::fn*
139        if let Some(last_segment) = path.segments.last() {
140            let fn_name = last_segment.ident.to_string();
141            // if fn_name.starts_with("fn") && fn_name.contains("_expr") {
142            fn_name.contains("_type_hint")
143                && path.segments.len() > 2
144                && path.segments[0].ident == "stageleft"
145                && path.segments[1].ident == "runtime_support"
146        } else {
147            false
148        }
149    }
150
151    fn extract_closure_from_args(
152        &self,
153        args: &syn::punctuated::Punctuated<syn::Expr, syn::Token![,]>,
154    ) -> Option<syn::Expr> {
155        // Look through the arguments for a closure expression
156        for arg in args {
157            if let syn::Expr::Closure(_) = arg {
158                return Some(arg.clone());
159            }
160            // Also check for closures nested in other expressions (like blocks)
161            if let Some(closure_expr) = self.find_closure_in_expr(arg) {
162                return Some(closure_expr);
163            }
164        }
165        None
166    }
167
168    fn find_closure_in_expr(&self, expr: &syn::Expr) -> Option<syn::Expr> {
169        let mut visitor = ClosureFinder {
170            found_closure: None,
171            prefer_inner_blocks: true,
172        };
173        visitor.visit_expr(expr);
174        visitor.found_closure
175    }
176}
177
178/// Visitor that finds closures in expressions with special block handling
179struct ClosureFinder {
180    found_closure: Option<syn::Expr>,
181    prefer_inner_blocks: bool,
182}
183
184impl<'ast> Visit<'ast> for ClosureFinder {
185    fn visit_expr(&mut self, expr: &'ast syn::Expr) {
186        // If we already found a closure, don't continue searching
187        if self.found_closure.is_some() {
188            return;
189        }
190
191        match expr {
192            syn::Expr::Closure(_) => {
193                self.found_closure = Some(expr.clone());
194            }
195            syn::Expr::Block(block) if self.prefer_inner_blocks => {
196                // Special handling for blocks - look for inner blocks that contain closures
197                for stmt in &block.block.stmts {
198                    if let syn::Stmt::Expr(stmt_expr, _) = stmt
199                        && let syn::Expr::Block(_) = stmt_expr
200                    {
201                        // Check if this nested block contains a closure
202                        let mut inner_visitor = ClosureFinder {
203                            found_closure: None,
204                            prefer_inner_blocks: false, // Avoid infinite recursion
205                        };
206                        inner_visitor.visit_expr(stmt_expr);
207                        if inner_visitor.found_closure.is_some() {
208                            // Found a closure in an inner block, return that block
209                            self.found_closure = Some(stmt_expr.clone());
210                            return;
211                        }
212                    }
213                }
214
215                // If no inner block with closure found, continue with normal visitation
216                visit::visit_expr(self, expr);
217
218                // If we found a closure, just return the closure itself, not the whole block
219                // unless we're in the special case where we want the containing block
220                if self.found_closure.is_some() {
221                    // The closure was found during visitation, no need to wrap in block
222                }
223            }
224            _ => {
225                // Use default visitor behavior for all other expressions
226                visit::visit_expr(self, expr);
227            }
228        }
229    }
230}
231
232/// Debug displays the type's tokens.
233///
234/// Boxes `syn::Type` which is ~320 bytes.
235#[derive(Clone, PartialEq, Eq, Hash)]
236pub struct DebugType(pub Box<syn::Type>);
237
238impl From<syn::Type> for DebugType {
239    fn from(t: syn::Type) -> Self {
240        Self(Box::new(t))
241    }
242}
243
244impl Deref for DebugType {
245    type Target = syn::Type;
246
247    fn deref(&self) -> &Self::Target {
248        &self.0
249    }
250}
251
252impl ToTokens for DebugType {
253    fn to_tokens(&self, tokens: &mut TokenStream) {
254        self.0.to_tokens(tokens);
255    }
256}
257
258impl Debug for DebugType {
259    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
260        write!(f, "{}", self.0.to_token_stream())
261    }
262}
263
264impl serde::Serialize for DebugType {
265    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
266        serializer.serialize_str(&format!("{}", self.0.to_token_stream()))
267    }
268}
269
270fn serialize_backtrace_as_span<S: serde::Serializer>(
271    backtrace: &Backtrace,
272    serializer: S,
273) -> Result<S::Ok, S::Error> {
274    match backtrace.format_span() {
275        Some(span) => serializer.serialize_some(&span),
276        None => serializer.serialize_none(),
277    }
278}
279
280fn serialize_ident<S: serde::Serializer>(
281    ident: &syn::Ident,
282    serializer: S,
283) -> Result<S::Ok, S::Error> {
284    serializer.serialize_str(&ident.to_string())
285}
286
287pub enum DebugInstantiate {
288    Building,
289    Finalized(Box<DebugInstantiateFinalized>),
290}
291
292impl serde::Serialize for DebugInstantiate {
293    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
294        match self {
295            DebugInstantiate::Building => {
296                serializer.serialize_unit_variant("DebugInstantiate", 0, "Building")
297            }
298            DebugInstantiate::Finalized(_) => {
299                panic!(
300                    "cannot serialize DebugInstantiate::Finalized: contains non-serializable runtime state (closures)"
301                )
302            }
303        }
304    }
305}
306
307#[cfg_attr(
308    not(feature = "build"),
309    expect(
310        dead_code,
311        reason = "sink, source unused without `feature = \"build\"`."
312    )
313)]
314pub struct DebugInstantiateFinalized {
315    sink: syn::Expr,
316    source: syn::Expr,
317    connect_fn: Option<Box<dyn FnOnce()>>,
318}
319
320impl From<DebugInstantiateFinalized> for DebugInstantiate {
321    fn from(f: DebugInstantiateFinalized) -> Self {
322        Self::Finalized(Box::new(f))
323    }
324}
325
326impl Debug for DebugInstantiate {
327    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
328        write!(f, "<network instantiate>")
329    }
330}
331
332impl Hash for DebugInstantiate {
333    fn hash<H: Hasher>(&self, _state: &mut H) {
334        // Do nothing
335    }
336}
337
338impl Clone for DebugInstantiate {
339    fn clone(&self) -> Self {
340        match self {
341            DebugInstantiate::Building => DebugInstantiate::Building,
342            DebugInstantiate::Finalized(_) => {
343                panic!("DebugInstantiate::Finalized should not be cloned")
344            }
345        }
346    }
347}
348
349/// Tracks the instantiation state of a `ClusterMembers` source.
350///
351/// During `compile_network`, the first `ClusterMembers` node for a given
352/// `(at_location, target_cluster)` pair is promoted to [`Self::Stream`] and
353/// receives the expression returned by `Deploy::cluster_membership_stream`.
354/// All subsequent nodes for the same pair are set to [`Self::Tee`] so that
355/// during code-gen they simply reference the tee output of the first node
356/// instead of creating a redundant `source_stream`.
357#[derive(Debug, Hash, Clone, serde::Serialize)]
358pub enum ClusterMembersState {
359    /// Not yet instantiated.
360    Uninit,
361    /// The primary instance: holds the stream expression and will emit
362    /// `source_stream(expr) -> tee()` during code-gen.
363    Stream(DebugExpr),
364    /// A secondary instance that references the tee output of the primary.
365    /// Stores `(at_location_root, target_cluster_location)` so that `emit_core`
366    /// can derive the deterministic tee ident without extra state.
367    Tee(LocationId, LocationId),
368}
369
370/// A source in a Hydro graph, where data enters the graph.
371#[derive(Debug, Hash, Clone, serde::Serialize)]
372pub enum HydroSource {
373    Stream(DebugExpr),
374    ExternalNetwork(),
375    Iter(DebugExpr),
376    Spin(),
377    ClusterMembers(LocationId, ClusterMembersState),
378    Embedded(#[serde(serialize_with = "serialize_ident")] syn::Ident),
379    EmbeddedSingleton(#[serde(serialize_with = "serialize_ident")] syn::Ident),
380}
381
382#[cfg(feature = "build")]
383/// A trait that abstracts over elements of DFIR code-gen that differ between production deployment
384/// and simulations.
385///
386/// In particular, this lets the simulator fuse together all locations into one DFIR graph, spit
387/// out separate graphs for each tick, and emit hooks for controlling non-deterministic operators.
388pub trait DfirBuilder {
389    /// Whether the representation of singletons should include intermediate states.
390    fn singleton_intermediates(&self) -> bool;
391
392    /// Gets the DFIR builder for the given location, creating it if necessary.
393    fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder;
394
395    fn batch(
396        &mut self,
397        in_ident: syn::Ident,
398        in_location: &LocationId,
399        in_kind: &CollectionKind,
400        out_ident: &syn::Ident,
401        out_location: &LocationId,
402        op_meta: &HydroIrOpMetadata,
403    );
404    fn yield_from_tick(
405        &mut self,
406        in_ident: syn::Ident,
407        in_location: &LocationId,
408        in_kind: &CollectionKind,
409        out_ident: &syn::Ident,
410        out_location: &LocationId,
411    );
412
413    fn begin_atomic(
414        &mut self,
415        in_ident: syn::Ident,
416        in_location: &LocationId,
417        in_kind: &CollectionKind,
418        out_ident: &syn::Ident,
419        out_location: &LocationId,
420        op_meta: &HydroIrOpMetadata,
421    );
422    fn end_atomic(
423        &mut self,
424        in_ident: syn::Ident,
425        in_location: &LocationId,
426        in_kind: &CollectionKind,
427        out_ident: &syn::Ident,
428    );
429
430    #[expect(clippy::too_many_arguments, reason = "TODO // internal")]
431    fn observe_nondet(
432        &mut self,
433        trusted: bool,
434        location: &LocationId,
435        in_ident: syn::Ident,
436        in_kind: &CollectionKind,
437        out_ident: &syn::Ident,
438        out_kind: &CollectionKind,
439        op_meta: &HydroIrOpMetadata,
440    );
441
442    #[expect(clippy::too_many_arguments, reason = "TODO")]
443    fn create_network(
444        &mut self,
445        from: &LocationId,
446        to: &LocationId,
447        input_ident: syn::Ident,
448        out_ident: &syn::Ident,
449        serialize: Option<&DebugExpr>,
450        sink: syn::Expr,
451        source: syn::Expr,
452        deserialize: Option<&DebugExpr>,
453        tag_id: usize,
454        networking_info: &crate::networking::NetworkingInfo,
455    );
456
457    fn create_external_source(
458        &mut self,
459        on: &LocationId,
460        source_expr: syn::Expr,
461        out_ident: &syn::Ident,
462        deserialize: Option<&DebugExpr>,
463        tag_id: usize,
464    );
465
466    fn create_external_output(
467        &mut self,
468        on: &LocationId,
469        sink_expr: syn::Expr,
470        input_ident: &syn::Ident,
471        serialize: Option<&DebugExpr>,
472        tag_id: usize,
473    );
474}
475
476#[cfg(feature = "build")]
477impl DfirBuilder for SecondaryMap<LocationKey, FlatGraphBuilder> {
478    fn singleton_intermediates(&self) -> bool {
479        false
480    }
481
482    fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder {
483        self.entry(location.root().key())
484            .expect("location was removed")
485            .or_default()
486    }
487
488    fn batch(
489        &mut self,
490        in_ident: syn::Ident,
491        in_location: &LocationId,
492        in_kind: &CollectionKind,
493        out_ident: &syn::Ident,
494        _out_location: &LocationId,
495        _op_meta: &HydroIrOpMetadata,
496    ) {
497        let builder = self.get_dfir_mut(in_location.root());
498        if in_kind.is_bounded()
499            && matches!(
500                in_kind,
501                CollectionKind::Singleton { .. }
502                    | CollectionKind::Optional { .. }
503                    | CollectionKind::KeyedSingleton { .. }
504            )
505        {
506            assert!(in_location.is_top_level());
507            builder.add_dfir(
508                parse_quote! {
509                    #out_ident = #in_ident -> persist::<'static>();
510                },
511                None,
512                None,
513            );
514        } else {
515            builder.add_dfir(
516                parse_quote! {
517                    #out_ident = #in_ident;
518                },
519                None,
520                None,
521            );
522        }
523    }
524
525    fn yield_from_tick(
526        &mut self,
527        in_ident: syn::Ident,
528        in_location: &LocationId,
529        _in_kind: &CollectionKind,
530        out_ident: &syn::Ident,
531        _out_location: &LocationId,
532    ) {
533        let builder = self.get_dfir_mut(in_location.root());
534        builder.add_dfir(
535            parse_quote! {
536                #out_ident = #in_ident;
537            },
538            None,
539            None,
540        );
541    }
542
543    fn begin_atomic(
544        &mut self,
545        in_ident: syn::Ident,
546        in_location: &LocationId,
547        _in_kind: &CollectionKind,
548        out_ident: &syn::Ident,
549        _out_location: &LocationId,
550        _op_meta: &HydroIrOpMetadata,
551    ) {
552        let builder = self.get_dfir_mut(in_location.root());
553        builder.add_dfir(
554            parse_quote! {
555                #out_ident = #in_ident;
556            },
557            None,
558            None,
559        );
560    }
561
562    fn end_atomic(
563        &mut self,
564        in_ident: syn::Ident,
565        in_location: &LocationId,
566        _in_kind: &CollectionKind,
567        out_ident: &syn::Ident,
568    ) {
569        let builder = self.get_dfir_mut(in_location.root());
570        builder.add_dfir(
571            parse_quote! {
572                #out_ident = #in_ident;
573            },
574            None,
575            None,
576        );
577    }
578
579    fn observe_nondet(
580        &mut self,
581        _trusted: bool,
582        location: &LocationId,
583        in_ident: syn::Ident,
584        _in_kind: &CollectionKind,
585        out_ident: &syn::Ident,
586        _out_kind: &CollectionKind,
587        _op_meta: &HydroIrOpMetadata,
588    ) {
589        let builder = self.get_dfir_mut(location);
590        builder.add_dfir(
591            parse_quote! {
592                #out_ident = #in_ident;
593            },
594            None,
595            None,
596        );
597    }
598
599    fn create_network(
600        &mut self,
601        from: &LocationId,
602        to: &LocationId,
603        input_ident: syn::Ident,
604        out_ident: &syn::Ident,
605        serialize: Option<&DebugExpr>,
606        sink: syn::Expr,
607        source: syn::Expr,
608        deserialize: Option<&DebugExpr>,
609        tag_id: usize,
610        _networking_info: &crate::networking::NetworkingInfo,
611    ) {
612        let sender_builder = self.get_dfir_mut(from);
613        if let Some(serialize_pipeline) = serialize {
614            sender_builder.add_dfir(
615                parse_quote! {
616                    #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink);
617                },
618                None,
619                // operator tag separates send and receive, which otherwise have the same next_stmt_id
620                Some(&format!("send{}", tag_id)),
621            );
622        } else {
623            sender_builder.add_dfir(
624                parse_quote! {
625                    #input_ident -> dest_sink(#sink);
626                },
627                None,
628                Some(&format!("send{}", tag_id)),
629            );
630        }
631
632        let receiver_builder = self.get_dfir_mut(to);
633        if let Some(deserialize_pipeline) = deserialize {
634            receiver_builder.add_dfir(
635                parse_quote! {
636                    #out_ident = source_stream(#source) -> map(#deserialize_pipeline);
637                },
638                None,
639                Some(&format!("recv{}", tag_id)),
640            );
641        } else {
642            receiver_builder.add_dfir(
643                parse_quote! {
644                    #out_ident = source_stream(#source);
645                },
646                None,
647                Some(&format!("recv{}", tag_id)),
648            );
649        }
650    }
651
652    fn create_external_source(
653        &mut self,
654        on: &LocationId,
655        source_expr: syn::Expr,
656        out_ident: &syn::Ident,
657        deserialize: Option<&DebugExpr>,
658        tag_id: usize,
659    ) {
660        let receiver_builder = self.get_dfir_mut(on);
661        if let Some(deserialize_pipeline) = deserialize {
662            receiver_builder.add_dfir(
663                parse_quote! {
664                    #out_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
665                },
666                None,
667                Some(&format!("recv{}", tag_id)),
668            );
669        } else {
670            receiver_builder.add_dfir(
671                parse_quote! {
672                    #out_ident = source_stream(#source_expr);
673                },
674                None,
675                Some(&format!("recv{}", tag_id)),
676            );
677        }
678    }
679
680    fn create_external_output(
681        &mut self,
682        on: &LocationId,
683        sink_expr: syn::Expr,
684        input_ident: &syn::Ident,
685        serialize: Option<&DebugExpr>,
686        tag_id: usize,
687    ) {
688        let sender_builder = self.get_dfir_mut(on);
689        if let Some(serialize_fn) = serialize {
690            sender_builder.add_dfir(
691                parse_quote! {
692                    #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
693                },
694                None,
695                // operator tag separates send and receive, which otherwise have the same next_stmt_id
696                Some(&format!("send{}", tag_id)),
697            );
698        } else {
699            sender_builder.add_dfir(
700                parse_quote! {
701                    #input_ident -> dest_sink(#sink_expr);
702                },
703                None,
704                Some(&format!("send{}", tag_id)),
705            );
706        }
707    }
708}
709
710#[cfg(feature = "build")]
711pub enum BuildersOrCallback<'a, L, N>
712where
713    L: FnMut(&mut HydroRoot, &mut usize),
714    N: FnMut(&mut HydroNode, &mut usize),
715{
716    Builders(&'a mut dyn DfirBuilder),
717    Callback(L, N),
718}
719
720/// An root in a Hydro graph, which is an pipeline that doesn't emit
721/// any downstream values. Traversals over the dataflow graph and
722/// generating DFIR IR start from roots.
723#[derive(Debug, Hash, serde::Serialize)]
724pub enum HydroRoot {
725    ForEach {
726        f: DebugExpr,
727        input: Box<HydroNode>,
728        op_metadata: HydroIrOpMetadata,
729    },
730    SendExternal {
731        to_external_key: LocationKey,
732        to_port_id: ExternalPortId,
733        to_many: bool,
734        unpaired: bool,
735        serialize_fn: Option<DebugExpr>,
736        instantiate_fn: DebugInstantiate,
737        input: Box<HydroNode>,
738        op_metadata: HydroIrOpMetadata,
739    },
740    DestSink {
741        sink: DebugExpr,
742        input: Box<HydroNode>,
743        op_metadata: HydroIrOpMetadata,
744    },
745    CycleSink {
746        cycle_id: CycleId,
747        input: Box<HydroNode>,
748        op_metadata: HydroIrOpMetadata,
749    },
750    EmbeddedOutput {
751        #[serde(serialize_with = "serialize_ident")]
752        ident: syn::Ident,
753        input: Box<HydroNode>,
754        op_metadata: HydroIrOpMetadata,
755    },
756    Null {
757        input: Box<HydroNode>,
758        op_metadata: HydroIrOpMetadata,
759    },
760}
761
762impl HydroRoot {
763    #[cfg(feature = "build")]
764    #[expect(clippy::too_many_arguments, reason = "TODO(internal)")]
765    pub fn compile_network<'a, D>(
766        &mut self,
767        extra_stmts: &mut SparseSecondaryMap<LocationKey, Vec<syn::Stmt>>,
768        seen_tees: &mut SeenSharedNodes,
769        seen_cluster_members: &mut HashSet<(LocationId, LocationId)>,
770        processes: &SparseSecondaryMap<LocationKey, D::Process>,
771        clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
772        externals: &SparseSecondaryMap<LocationKey, D::External>,
773        env: &mut D::InstantiateEnv,
774    ) where
775        D: Deploy<'a>,
776    {
777        let refcell_extra_stmts = RefCell::new(extra_stmts);
778        let refcell_env = RefCell::new(env);
779        let refcell_seen_cluster_members = RefCell::new(seen_cluster_members);
780        self.transform_bottom_up(
781            &mut |l| {
782                if let HydroRoot::SendExternal {
783                    input,
784                    to_external_key,
785                    to_port_id,
786                    to_many,
787                    unpaired,
788                    instantiate_fn,
789                    ..
790                } = l
791                {
792                    let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
793                        DebugInstantiate::Building => {
794                            let to_node = externals
795                                .get(*to_external_key)
796                                .unwrap_or_else(|| {
797                                    panic!("A external used in the graph was not instantiated: {}", to_external_key)
798                                })
799                                .clone();
800
801                            match input.metadata().location_id.root() {
802                                &LocationId::Process(process_key) => {
803                                    if *to_many {
804                                        (
805                                            (
806                                                D::e2o_many_sink(format!("{}_{}", *to_external_key, *to_port_id)),
807                                                parse_quote!(DUMMY),
808                                            ),
809                                            Box::new(|| {}) as Box<dyn FnOnce()>,
810                                        )
811                                    } else {
812                                        let from_node = processes
813                                            .get(process_key)
814                                            .unwrap_or_else(|| {
815                                                panic!("A process used in the graph was not instantiated: {}", process_key)
816                                            })
817                                            .clone();
818
819                                        let sink_port = from_node.next_port();
820                                        let source_port = to_node.next_port();
821
822                                        if *unpaired {
823                                            use stageleft::quote_type;
824                                            use tokio_util::codec::LengthDelimitedCodec;
825
826                                            to_node.register(*to_port_id, source_port.clone());
827
828                                            let _ = D::e2o_source(
829                                                refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
830                                                &to_node, &source_port,
831                                                &from_node, &sink_port,
832                                                &quote_type::<LengthDelimitedCodec>(),
833                                                format!("{}_{}", *to_external_key, *to_port_id)
834                                            );
835                                        }
836
837                                        (
838                                            (
839                                                D::o2e_sink(
840                                                    &from_node,
841                                                    &sink_port,
842                                                    &to_node,
843                                                    &source_port,
844                                                    format!("{}_{}", *to_external_key, *to_port_id)
845                                                ),
846                                                parse_quote!(DUMMY),
847                                            ),
848                                            if *unpaired {
849                                                D::e2o_connect(
850                                                    &to_node,
851                                                    &source_port,
852                                                    &from_node,
853                                                    &sink_port,
854                                                    *to_many,
855                                                    NetworkHint::Auto,
856                                                )
857                                            } else {
858                                                Box::new(|| {}) as Box<dyn FnOnce()>
859                                            },
860                                        )
861                                    }
862                                }
863                                LocationId::Cluster(cluster_key) => {
864                                    let from_node = clusters
865                                        .get(*cluster_key)
866                                        .unwrap_or_else(|| {
867                                            panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
868                                        })
869                                        .clone();
870
871                                    let sink_port = from_node.next_port();
872                                    let source_port = to_node.next_port();
873
874                                    if *unpaired {
875                                        to_node.register(*to_port_id, source_port.clone());
876                                    }
877
878                                    (
879                                        (
880                                            D::m2e_sink(
881                                                &from_node,
882                                                &sink_port,
883                                                &to_node,
884                                                &source_port,
885                                                format!("{}_{}", *to_external_key, *to_port_id)
886                                            ),
887                                            parse_quote!(DUMMY),
888                                        ),
889                                        Box::new(|| {}) as Box<dyn FnOnce()>,
890                                    )
891                                }
892                                _ => panic!()
893                            }
894                        },
895
896                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
897                    };
898
899                    *instantiate_fn = DebugInstantiateFinalized {
900                        sink: sink_expr,
901                        source: source_expr,
902                        connect_fn: Some(connect_fn),
903                    }
904                    .into();
905                } else if let HydroRoot::EmbeddedOutput { ident, input, .. } = l {
906                    let element_type = match &input.metadata().collection_kind {
907                        CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
908                        _ => panic!("Embedded output must have Stream collection kind"),
909                    };
910                    let location_key = match input.metadata().location_id.root() {
911                        LocationId::Process(key) | LocationId::Cluster(key) => *key,
912                        _ => panic!("Embedded output must be on a process or cluster"),
913                    };
914                    D::register_embedded_output(
915                        &mut refcell_env.borrow_mut(),
916                        location_key,
917                        ident,
918                        &element_type,
919                    );
920                }
921            },
922            &mut |n| {
923                if let HydroNode::Network {
924                    name,
925                    networking_info,
926                    input,
927                    instantiate_fn,
928                    metadata,
929                    ..
930                } = n
931                {
932                    let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
933                        DebugInstantiate::Building => instantiate_network::<D>(
934                            &mut refcell_env.borrow_mut(),
935                            input.metadata().location_id.root(),
936                            metadata.location_id.root(),
937                            processes,
938                            clusters,
939                            name.as_deref(),
940                            networking_info,
941                        ),
942
943                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
944                    };
945
946                    *instantiate_fn = DebugInstantiateFinalized {
947                        sink: sink_expr,
948                        source: source_expr,
949                        connect_fn: Some(connect_fn),
950                    }
951                    .into();
952                } else if let HydroNode::ExternalInput {
953                    from_external_key,
954                    from_port_id,
955                    from_many,
956                    codec_type,
957                    port_hint,
958                    instantiate_fn,
959                    metadata,
960                    ..
961                } = n
962                {
963                    let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
964                        DebugInstantiate::Building => {
965                            let from_node = externals
966                                .get(*from_external_key)
967                                .unwrap_or_else(|| {
968                                    panic!(
969                                        "A external used in the graph was not instantiated: {}",
970                                        from_external_key,
971                                    )
972                                })
973                                .clone();
974
975                            match metadata.location_id.root() {
976                                &LocationId::Process(process_key) => {
977                                    let to_node = processes
978                                        .get(process_key)
979                                        .unwrap_or_else(|| {
980                                            panic!("A process used in the graph was not instantiated: {}", process_key)
981                                        })
982                                        .clone();
983
984                                    let sink_port = from_node.next_port();
985                                    let source_port = to_node.next_port();
986
987                                    from_node.register(*from_port_id, sink_port.clone());
988
989                                    (
990                                        (
991                                            parse_quote!(DUMMY),
992                                            if *from_many {
993                                                D::e2o_many_source(
994                                                    refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
995                                                    &to_node, &source_port,
996                                                    codec_type.0.as_ref(),
997                                                    format!("{}_{}", *from_external_key, *from_port_id)
998                                                )
999                                            } else {
1000                                                D::e2o_source(
1001                                                    refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1002                                                    &from_node, &sink_port,
1003                                                    &to_node, &source_port,
1004                                                    codec_type.0.as_ref(),
1005                                                    format!("{}_{}", *from_external_key, *from_port_id)
1006                                                )
1007                                            },
1008                                        ),
1009                                        D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
1010                                    )
1011                                }
1012                                LocationId::Cluster(cluster_key) => {
1013                                    let to_node = clusters
1014                                        .get(*cluster_key)
1015                                        .unwrap_or_else(|| {
1016                                            panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
1017                                        })
1018                                        .clone();
1019
1020                                    let sink_port = from_node.next_port();
1021                                    let source_port = to_node.next_port();
1022
1023                                    from_node.register(*from_port_id, sink_port.clone());
1024
1025                                    (
1026                                        (
1027                                            parse_quote!(DUMMY),
1028                                            D::e2m_source(
1029                                                refcell_extra_stmts.borrow_mut().entry(*cluster_key).expect("location was removed").or_default(),
1030                                                &from_node, &sink_port,
1031                                                &to_node, &source_port,
1032                                                codec_type.0.as_ref(),
1033                                                format!("{}_{}", *from_external_key, *from_port_id)
1034                                            ),
1035                                        ),
1036                                        D::e2m_connect(&from_node, &sink_port, &to_node, &source_port, *port_hint),
1037                                    )
1038                                }
1039                                _ => panic!()
1040                            }
1041                        },
1042
1043                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1044                    };
1045
1046                    *instantiate_fn = DebugInstantiateFinalized {
1047                        sink: sink_expr,
1048                        source: source_expr,
1049                        connect_fn: Some(connect_fn),
1050                    }
1051                    .into();
1052                } else if let HydroNode::Source { source: HydroSource::Embedded(ident), metadata } = n {
1053                    let element_type = match &metadata.collection_kind {
1054                        CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
1055                        _ => panic!("Embedded source must have Stream collection kind"),
1056                    };
1057                    let location_key = match metadata.location_id.root() {
1058                        LocationId::Process(key) | LocationId::Cluster(key) => *key,
1059                        _ => panic!("Embedded source must be on a process or cluster"),
1060                    };
1061                    D::register_embedded_stream_input(
1062                        &mut refcell_env.borrow_mut(),
1063                        location_key,
1064                        ident,
1065                        &element_type,
1066                    );
1067                } else if let HydroNode::Source { source: HydroSource::EmbeddedSingleton(ident), metadata } = n {
1068                    let element_type = match &metadata.collection_kind {
1069                        CollectionKind::Singleton { element_type, .. } => element_type.0.as_ref().clone(),
1070                        _ => panic!("EmbeddedSingleton source must have Singleton collection kind"),
1071                    };
1072                    let location_key = match metadata.location_id.root() {
1073                        LocationId::Process(key) | LocationId::Cluster(key) => *key,
1074                        _ => panic!("EmbeddedSingleton source must be on a process or cluster"),
1075                    };
1076                    D::register_embedded_singleton_input(
1077                        &mut refcell_env.borrow_mut(),
1078                        location_key,
1079                        ident,
1080                        &element_type,
1081                    );
1082                } else if let HydroNode::Source { source: HydroSource::ClusterMembers(location_id, state), metadata } = n {
1083                    match state {
1084                        ClusterMembersState::Uninit => {
1085                            let at_location = metadata.location_id.root().clone();
1086                            let key = (at_location.clone(), LocationId::Cluster(location_id.key()));
1087                            if refcell_seen_cluster_members.borrow_mut().insert(key) {
1088                                // First occurrence: call cluster_membership_stream and mark as Stream.
1089                                let expr = stageleft::QuotedWithContext::splice_untyped_ctx(
1090                                    D::cluster_membership_stream(&mut refcell_env.borrow_mut(), &at_location, location_id),
1091                                    &(),
1092                                );
1093                                *state = ClusterMembersState::Stream(expr.into());
1094                            } else {
1095                                // Already instantiated for this (at, target) pair: just tee.
1096                                *state = ClusterMembersState::Tee(at_location, location_id.clone());
1097                            }
1098                        }
1099                        ClusterMembersState::Stream(_) | ClusterMembersState::Tee(..) => {
1100                            panic!("cluster members already finalized");
1101                        }
1102                    }
1103                }
1104            },
1105            seen_tees,
1106            false,
1107        );
1108    }
1109
1110    pub fn connect_network(&mut self, seen_tees: &mut SeenSharedNodes) {
1111        self.transform_bottom_up(
1112            &mut |l| {
1113                if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
1114                    match instantiate_fn {
1115                        DebugInstantiate::Building => panic!("network not built"),
1116
1117                        DebugInstantiate::Finalized(finalized) => {
1118                            (finalized.connect_fn.take().unwrap())();
1119                        }
1120                    }
1121                }
1122            },
1123            &mut |n| {
1124                if let HydroNode::Network { instantiate_fn, .. }
1125                | HydroNode::ExternalInput { instantiate_fn, .. } = n
1126                {
1127                    match instantiate_fn {
1128                        DebugInstantiate::Building => panic!("network not built"),
1129
1130                        DebugInstantiate::Finalized(finalized) => {
1131                            (finalized.connect_fn.take().unwrap())();
1132                        }
1133                    }
1134                }
1135            },
1136            seen_tees,
1137            false,
1138        );
1139    }
1140
1141    pub fn transform_bottom_up(
1142        &mut self,
1143        transform_root: &mut impl FnMut(&mut HydroRoot),
1144        transform_node: &mut impl FnMut(&mut HydroNode),
1145        seen_tees: &mut SeenSharedNodes,
1146        check_well_formed: bool,
1147    ) {
1148        self.transform_children(
1149            |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
1150            seen_tees,
1151        );
1152
1153        transform_root(self);
1154    }
1155
1156    pub fn transform_children(
1157        &mut self,
1158        mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
1159        seen_tees: &mut SeenSharedNodes,
1160    ) {
1161        match self {
1162            HydroRoot::ForEach { input, .. }
1163            | HydroRoot::SendExternal { input, .. }
1164            | HydroRoot::DestSink { input, .. }
1165            | HydroRoot::CycleSink { input, .. }
1166            | HydroRoot::EmbeddedOutput { input, .. }
1167            | HydroRoot::Null { input, .. } => {
1168                transform(input, seen_tees);
1169            }
1170        }
1171    }
1172
1173    pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroRoot {
1174        match self {
1175            HydroRoot::ForEach {
1176                f,
1177                input,
1178                op_metadata,
1179            } => HydroRoot::ForEach {
1180                f: f.clone(),
1181                input: Box::new(input.deep_clone(seen_tees)),
1182                op_metadata: op_metadata.clone(),
1183            },
1184            HydroRoot::SendExternal {
1185                to_external_key,
1186                to_port_id,
1187                to_many,
1188                unpaired,
1189                serialize_fn,
1190                instantiate_fn,
1191                input,
1192                op_metadata,
1193            } => HydroRoot::SendExternal {
1194                to_external_key: *to_external_key,
1195                to_port_id: *to_port_id,
1196                to_many: *to_many,
1197                unpaired: *unpaired,
1198                serialize_fn: serialize_fn.clone(),
1199                instantiate_fn: instantiate_fn.clone(),
1200                input: Box::new(input.deep_clone(seen_tees)),
1201                op_metadata: op_metadata.clone(),
1202            },
1203            HydroRoot::DestSink {
1204                sink,
1205                input,
1206                op_metadata,
1207            } => HydroRoot::DestSink {
1208                sink: sink.clone(),
1209                input: Box::new(input.deep_clone(seen_tees)),
1210                op_metadata: op_metadata.clone(),
1211            },
1212            HydroRoot::CycleSink {
1213                cycle_id,
1214                input,
1215                op_metadata,
1216            } => HydroRoot::CycleSink {
1217                cycle_id: *cycle_id,
1218                input: Box::new(input.deep_clone(seen_tees)),
1219                op_metadata: op_metadata.clone(),
1220            },
1221            HydroRoot::EmbeddedOutput {
1222                ident,
1223                input,
1224                op_metadata,
1225            } => HydroRoot::EmbeddedOutput {
1226                ident: ident.clone(),
1227                input: Box::new(input.deep_clone(seen_tees)),
1228                op_metadata: op_metadata.clone(),
1229            },
1230            HydroRoot::Null { input, op_metadata } => HydroRoot::Null {
1231                input: Box::new(input.deep_clone(seen_tees)),
1232                op_metadata: op_metadata.clone(),
1233            },
1234        }
1235    }
1236
1237    #[cfg(feature = "build")]
1238    pub fn emit(
1239        &mut self,
1240        graph_builders: &mut dyn DfirBuilder,
1241        seen_tees: &mut SeenSharedNodes,
1242        built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1243        next_stmt_id: &mut usize,
1244    ) {
1245        self.emit_core(
1246            &mut BuildersOrCallback::<
1247                fn(&mut HydroRoot, &mut usize),
1248                fn(&mut HydroNode, &mut usize),
1249            >::Builders(graph_builders),
1250            seen_tees,
1251            built_tees,
1252            next_stmt_id,
1253        );
1254    }
1255
1256    #[cfg(feature = "build")]
1257    pub fn emit_core(
1258        &mut self,
1259        builders_or_callback: &mut BuildersOrCallback<
1260            impl FnMut(&mut HydroRoot, &mut usize),
1261            impl FnMut(&mut HydroNode, &mut usize),
1262        >,
1263        seen_tees: &mut SeenSharedNodes,
1264        built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1265        next_stmt_id: &mut usize,
1266    ) {
1267        match self {
1268            HydroRoot::ForEach { f, input, .. } => {
1269                let input_ident =
1270                    input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1271
1272                match builders_or_callback {
1273                    BuildersOrCallback::Builders(graph_builders) => {
1274                        graph_builders
1275                            .get_dfir_mut(&input.metadata().location_id)
1276                            .add_dfir(
1277                                parse_quote! {
1278                                    #input_ident -> for_each(#f);
1279                                },
1280                                None,
1281                                Some(&next_stmt_id.to_string()),
1282                            );
1283                    }
1284                    BuildersOrCallback::Callback(leaf_callback, _) => {
1285                        leaf_callback(self, next_stmt_id);
1286                    }
1287                }
1288
1289                *next_stmt_id += 1;
1290            }
1291
1292            HydroRoot::SendExternal {
1293                serialize_fn,
1294                instantiate_fn,
1295                input,
1296                ..
1297            } => {
1298                let input_ident =
1299                    input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1300
1301                match builders_or_callback {
1302                    BuildersOrCallback::Builders(graph_builders) => {
1303                        let (sink_expr, _) = match instantiate_fn {
1304                            DebugInstantiate::Building => (
1305                                syn::parse_quote!(DUMMY_SINK),
1306                                syn::parse_quote!(DUMMY_SOURCE),
1307                            ),
1308
1309                            DebugInstantiate::Finalized(finalized) => {
1310                                (finalized.sink.clone(), finalized.source.clone())
1311                            }
1312                        };
1313
1314                        graph_builders.create_external_output(
1315                            &input.metadata().location_id,
1316                            sink_expr,
1317                            &input_ident,
1318                            serialize_fn.as_ref(),
1319                            *next_stmt_id,
1320                        );
1321                    }
1322                    BuildersOrCallback::Callback(leaf_callback, _) => {
1323                        leaf_callback(self, next_stmt_id);
1324                    }
1325                }
1326
1327                *next_stmt_id += 1;
1328            }
1329
1330            HydroRoot::DestSink { sink, input, .. } => {
1331                let input_ident =
1332                    input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1333
1334                match builders_or_callback {
1335                    BuildersOrCallback::Builders(graph_builders) => {
1336                        graph_builders
1337                            .get_dfir_mut(&input.metadata().location_id)
1338                            .add_dfir(
1339                                parse_quote! {
1340                                    #input_ident -> dest_sink(#sink);
1341                                },
1342                                None,
1343                                Some(&next_stmt_id.to_string()),
1344                            );
1345                    }
1346                    BuildersOrCallback::Callback(leaf_callback, _) => {
1347                        leaf_callback(self, next_stmt_id);
1348                    }
1349                }
1350
1351                *next_stmt_id += 1;
1352            }
1353
1354            HydroRoot::CycleSink {
1355                cycle_id, input, ..
1356            } => {
1357                let input_ident =
1358                    input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1359
1360                match builders_or_callback {
1361                    BuildersOrCallback::Builders(graph_builders) => {
1362                        let elem_type: syn::Type = match &input.metadata().collection_kind {
1363                            CollectionKind::KeyedSingleton {
1364                                key_type,
1365                                value_type,
1366                                ..
1367                            }
1368                            | CollectionKind::KeyedStream {
1369                                key_type,
1370                                value_type,
1371                                ..
1372                            } => {
1373                                parse_quote!((#key_type, #value_type))
1374                            }
1375                            CollectionKind::Stream { element_type, .. }
1376                            | CollectionKind::Singleton { element_type, .. }
1377                            | CollectionKind::Optional { element_type, .. } => {
1378                                parse_quote!(#element_type)
1379                            }
1380                        };
1381
1382                        let cycle_id_ident = cycle_id.as_ident();
1383                        graph_builders
1384                            .get_dfir_mut(&input.metadata().location_id)
1385                            .add_dfir(
1386                                parse_quote! {
1387                                    #cycle_id_ident = #input_ident -> identity::<#elem_type>();
1388                                },
1389                                None,
1390                                None,
1391                            );
1392                    }
1393                    // No ID, no callback
1394                    BuildersOrCallback::Callback(_, _) => {}
1395                }
1396            }
1397
1398            HydroRoot::EmbeddedOutput { ident, input, .. } => {
1399                let input_ident =
1400                    input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1401
1402                match builders_or_callback {
1403                    BuildersOrCallback::Builders(graph_builders) => {
1404                        graph_builders
1405                            .get_dfir_mut(&input.metadata().location_id)
1406                            .add_dfir(
1407                                parse_quote! {
1408                                    #input_ident -> for_each(&mut #ident);
1409                                },
1410                                None,
1411                                Some(&next_stmt_id.to_string()),
1412                            );
1413                    }
1414                    BuildersOrCallback::Callback(leaf_callback, _) => {
1415                        leaf_callback(self, next_stmt_id);
1416                    }
1417                }
1418
1419                *next_stmt_id += 1;
1420            }
1421
1422            HydroRoot::Null { input, .. } => {
1423                let input_ident =
1424                    input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1425
1426                match builders_or_callback {
1427                    BuildersOrCallback::Builders(graph_builders) => {
1428                        graph_builders
1429                            .get_dfir_mut(&input.metadata().location_id)
1430                            .add_dfir(
1431                                parse_quote! {
1432                                    #input_ident -> for_each(|_| {});
1433                                },
1434                                None,
1435                                Some(&next_stmt_id.to_string()),
1436                            );
1437                    }
1438                    BuildersOrCallback::Callback(leaf_callback, _) => {
1439                        leaf_callback(self, next_stmt_id);
1440                    }
1441                }
1442
1443                *next_stmt_id += 1;
1444            }
1445        }
1446    }
1447
1448    pub fn op_metadata(&self) -> &HydroIrOpMetadata {
1449        match self {
1450            HydroRoot::ForEach { op_metadata, .. }
1451            | HydroRoot::SendExternal { op_metadata, .. }
1452            | HydroRoot::DestSink { op_metadata, .. }
1453            | HydroRoot::CycleSink { op_metadata, .. }
1454            | HydroRoot::EmbeddedOutput { op_metadata, .. }
1455            | HydroRoot::Null { op_metadata, .. } => op_metadata,
1456        }
1457    }
1458
1459    pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
1460        match self {
1461            HydroRoot::ForEach { op_metadata, .. }
1462            | HydroRoot::SendExternal { op_metadata, .. }
1463            | HydroRoot::DestSink { op_metadata, .. }
1464            | HydroRoot::CycleSink { op_metadata, .. }
1465            | HydroRoot::EmbeddedOutput { op_metadata, .. }
1466            | HydroRoot::Null { op_metadata, .. } => op_metadata,
1467        }
1468    }
1469
1470    pub fn input(&self) -> &HydroNode {
1471        match self {
1472            HydroRoot::ForEach { input, .. }
1473            | HydroRoot::SendExternal { input, .. }
1474            | HydroRoot::DestSink { input, .. }
1475            | HydroRoot::CycleSink { input, .. }
1476            | HydroRoot::EmbeddedOutput { input, .. }
1477            | HydroRoot::Null { input, .. } => input,
1478        }
1479    }
1480
1481    pub fn input_metadata(&self) -> &HydroIrMetadata {
1482        self.input().metadata()
1483    }
1484
1485    pub fn print_root(&self) -> String {
1486        match self {
1487            HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
1488            HydroRoot::SendExternal { .. } => "SendExternal".to_owned(),
1489            HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
1490            HydroRoot::CycleSink { cycle_id, .. } => format!("CycleSink({})", cycle_id),
1491            HydroRoot::EmbeddedOutput { ident, .. } => {
1492                format!("EmbeddedOutput({})", ident)
1493            }
1494            HydroRoot::Null { .. } => "Null".to_owned(),
1495        }
1496    }
1497
1498    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
1499        match self {
1500            HydroRoot::ForEach { f, .. } | HydroRoot::DestSink { sink: f, .. } => {
1501                transform(f);
1502            }
1503            HydroRoot::SendExternal { .. }
1504            | HydroRoot::CycleSink { .. }
1505            | HydroRoot::EmbeddedOutput { .. }
1506            | HydroRoot::Null { .. } => {}
1507        }
1508    }
1509}
1510
1511#[cfg(feature = "build")]
1512fn tick_of(loc: &LocationId) -> Option<ClockId> {
1513    match loc {
1514        LocationId::Tick(id, _) => Some(*id),
1515        LocationId::Atomic(inner) => tick_of(inner),
1516        _ => None,
1517    }
1518}
1519
1520#[cfg(feature = "build")]
1521fn remap_location(loc: &mut LocationId, uf: &mut HashMap<ClockId, ClockId>) {
1522    match loc {
1523        LocationId::Tick(id, inner) => {
1524            *id = uf_find(uf, *id);
1525            remap_location(inner, uf);
1526        }
1527        LocationId::Atomic(inner) => {
1528            remap_location(inner, uf);
1529        }
1530        LocationId::Process(_) | LocationId::Cluster(_) => {}
1531    }
1532}
1533
1534#[cfg(feature = "build")]
1535fn uf_find(parent: &mut HashMap<ClockId, ClockId>, x: ClockId) -> ClockId {
1536    let p = *parent.get(&x).unwrap_or(&x);
1537    if p == x {
1538        return x;
1539    }
1540    let root = uf_find(parent, p);
1541    parent.insert(x, root);
1542    root
1543}
1544
1545#[cfg(feature = "build")]
1546fn uf_union(parent: &mut HashMap<ClockId, ClockId>, a: ClockId, b: ClockId) {
1547    let ra = uf_find(parent, a);
1548    let rb = uf_find(parent, b);
1549    if ra != rb {
1550        parent.insert(ra, rb);
1551    }
1552}
1553
1554/// Traverse the IR to build a union-find that unifies tick IDs connected
1555/// through `Batch` and `YieldConcat` nodes at atomic boundaries, then
1556/// rewrite all `LocationId`s to use the representative tick ID.
1557#[cfg(feature = "build")]
1558pub fn unify_atomic_ticks(ir: &mut [HydroRoot]) {
1559    let mut uf: HashMap<ClockId, ClockId> = HashMap::new();
1560
1561    // Pass 1: collect unifications.
1562    transform_bottom_up(
1563        ir,
1564        &mut |_| {},
1565        &mut |node: &mut HydroNode| {
1566            if let HydroNode::Batch { inner, metadata } | HydroNode::YieldConcat { inner, metadata } =
1567                node
1568                && let (Some(a), Some(b)) = (
1569                    tick_of(&inner.metadata().location_id),
1570                    tick_of(&metadata.location_id),
1571                )
1572            {
1573                uf_union(&mut uf, a, b);
1574            }
1575        },
1576        false,
1577    );
1578
1579    // Pass 2: rewrite all LocationIds.
1580    transform_bottom_up(
1581        ir,
1582        &mut |_| {},
1583        &mut |node: &mut HydroNode| {
1584            remap_location(&mut node.metadata_mut().location_id, &mut uf);
1585        },
1586        false,
1587    );
1588}
1589
1590#[cfg(feature = "build")]
1591pub fn emit(ir: &mut Vec<HydroRoot>) -> SecondaryMap<LocationKey, FlatGraphBuilder> {
1592    let mut builders = SecondaryMap::new();
1593    let mut seen_tees = HashMap::new();
1594    let mut built_tees = HashMap::new();
1595    let mut next_stmt_id = 0;
1596    for leaf in ir {
1597        leaf.emit(
1598            &mut builders,
1599            &mut seen_tees,
1600            &mut built_tees,
1601            &mut next_stmt_id,
1602        );
1603    }
1604    builders
1605}
1606
1607#[cfg(feature = "build")]
1608pub fn traverse_dfir(
1609    ir: &mut [HydroRoot],
1610    transform_root: impl FnMut(&mut HydroRoot, &mut usize),
1611    transform_node: impl FnMut(&mut HydroNode, &mut usize),
1612) {
1613    let mut seen_tees = HashMap::new();
1614    let mut built_tees = HashMap::new();
1615    let mut next_stmt_id = 0;
1616    let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
1617    ir.iter_mut().for_each(|leaf| {
1618        leaf.emit_core(
1619            &mut callback,
1620            &mut seen_tees,
1621            &mut built_tees,
1622            &mut next_stmt_id,
1623        );
1624    });
1625}
1626
1627pub fn transform_bottom_up(
1628    ir: &mut [HydroRoot],
1629    transform_root: &mut impl FnMut(&mut HydroRoot),
1630    transform_node: &mut impl FnMut(&mut HydroNode),
1631    check_well_formed: bool,
1632) {
1633    let mut seen_tees = HashMap::new();
1634    ir.iter_mut().for_each(|leaf| {
1635        leaf.transform_bottom_up(
1636            transform_root,
1637            transform_node,
1638            &mut seen_tees,
1639            check_well_formed,
1640        );
1641    });
1642}
1643
1644pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
1645    let mut seen_tees = HashMap::new();
1646    ir.iter()
1647        .map(|leaf| leaf.deep_clone(&mut seen_tees))
1648        .collect()
1649}
1650
1651type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
1652thread_local! {
1653    static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
1654    /// Tracks shared nodes already serialized so that `SharedNode::serialize`
1655    /// emits the full subtree only once and uses a `"<shared N>"` back-reference
1656    /// on subsequent encounters, preventing infinite loops.
1657    static SERIALIZED_SHARED: PrintedTees
1658        = const { RefCell::new(None) };
1659}
1660
1661pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
1662    PRINTED_TEES.with(|printed_tees| {
1663        let mut printed_tees_mut = printed_tees.borrow_mut();
1664        *printed_tees_mut = Some((0, HashMap::new()));
1665        drop(printed_tees_mut);
1666
1667        let ret = f();
1668
1669        let mut printed_tees_mut = printed_tees.borrow_mut();
1670        *printed_tees_mut = None;
1671
1672        ret
1673    })
1674}
1675
1676/// Runs `f` with a fresh shared-node deduplication scope for serialization.
1677/// Any `SharedNode` serialized inside `f` will be tracked; the first occurrence
1678/// emits the full subtree while later occurrences emit a `{"$shared_ref": id}`
1679/// back-reference.  The tracking state is restored when `f` returns or panics.
1680pub fn serialize_dedup_shared<T>(f: impl FnOnce() -> T) -> T {
1681    let _guard = SerializedSharedGuard::enter();
1682    f()
1683}
1684
1685/// RAII guard that saves/restores the `SERIALIZED_SHARED` thread-local,
1686/// making `serialize_dedup_shared` re-entrant and panic-safe.
1687struct SerializedSharedGuard {
1688    previous: Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>,
1689}
1690
1691impl SerializedSharedGuard {
1692    fn enter() -> Self {
1693        let previous = SERIALIZED_SHARED.with(|cell| {
1694            let mut guard = cell.borrow_mut();
1695            guard.replace((0, HashMap::new()))
1696        });
1697        Self { previous }
1698    }
1699}
1700
1701impl Drop for SerializedSharedGuard {
1702    fn drop(&mut self) {
1703        SERIALIZED_SHARED.with(|cell| {
1704            *cell.borrow_mut() = self.previous.take();
1705        });
1706    }
1707}
1708
1709pub struct SharedNode(pub Rc<RefCell<HydroNode>>);
1710
1711impl serde::Serialize for SharedNode {
1712    /// Multiple `SharedNode`s can point to the same underlying `HydroNode` (via
1713    /// `Tee` / `Partition`).  A naïve recursive serialization would revisit the
1714    /// same subtree every time and, if the graph ever contains a cycle, loop
1715    /// forever.
1716    ///
1717    /// We keep a thread-local map (`SERIALIZED_SHARED`) from raw `Rc` pointer →
1718    /// integer id.  The first time we see a pointer we assign it the next id and
1719    /// emit the full subtree as `{"$shared": <id>, "node": …}`.  Every later
1720    /// encounter of the same pointer emits `{"$shared_ref": <id>}`, cutting the
1721    /// recursion.  Requires an active `serialize_dedup_shared` scope.
1722    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
1723        SERIALIZED_SHARED.with(|cell| {
1724            let mut guard = cell.borrow_mut();
1725            // (next_id, pointer → assigned_id)
1726            let state = guard.as_mut().ok_or_else(|| {
1727                serde::ser::Error::custom(
1728                    "SharedNode serialization requires an active serialize_dedup_shared scope",
1729                )
1730            })?;
1731            let ptr = self.0.as_ptr() as *const RefCell<HydroNode>;
1732
1733            if let Some(&id) = state.1.get(&ptr) {
1734                drop(guard);
1735                use serde::ser::SerializeMap;
1736                let mut map = serializer.serialize_map(Some(1))?;
1737                map.serialize_entry("$shared_ref", &id)?;
1738                map.end()
1739            } else {
1740                let id = state.0;
1741                state.0 += 1;
1742                state.1.insert(ptr, id);
1743                drop(guard);
1744
1745                use serde::ser::SerializeMap;
1746                let mut map = serializer.serialize_map(Some(2))?;
1747                map.serialize_entry("$shared", &id)?;
1748                map.serialize_entry("node", &*self.0.borrow())?;
1749                map.end()
1750            }
1751        })
1752    }
1753}
1754
1755impl SharedNode {
1756    pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
1757        Rc::as_ptr(&self.0)
1758    }
1759}
1760
1761impl Debug for SharedNode {
1762    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1763        PRINTED_TEES.with(|printed_tees| {
1764            let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
1765            let printed_tees_mut = printed_tees_mut_borrow.as_mut();
1766
1767            if let Some(printed_tees_mut) = printed_tees_mut {
1768                if let Some(existing) = printed_tees_mut
1769                    .1
1770                    .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
1771                {
1772                    write!(f, "<shared {}>", existing)
1773                } else {
1774                    let next_id = printed_tees_mut.0;
1775                    printed_tees_mut.0 += 1;
1776                    printed_tees_mut
1777                        .1
1778                        .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
1779                    drop(printed_tees_mut_borrow);
1780                    write!(f, "<shared {}>: ", next_id)?;
1781                    Debug::fmt(&self.0.borrow(), f)
1782                }
1783            } else {
1784                drop(printed_tees_mut_borrow);
1785                write!(f, "<shared>: ")?;
1786                Debug::fmt(&self.0.borrow(), f)
1787            }
1788        })
1789    }
1790}
1791
1792impl Hash for SharedNode {
1793    fn hash<H: Hasher>(&self, state: &mut H) {
1794        self.0.borrow_mut().hash(state);
1795    }
1796}
1797
1798#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
1799pub enum BoundKind {
1800    Unbounded,
1801    Bounded,
1802}
1803
1804#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
1805pub enum StreamOrder {
1806    NoOrder,
1807    TotalOrder,
1808}
1809
1810#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
1811pub enum StreamRetry {
1812    AtLeastOnce,
1813    ExactlyOnce,
1814}
1815
1816#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
1817pub enum KeyedSingletonBoundKind {
1818    Unbounded,
1819    MonotonicValue,
1820    BoundedValue,
1821    Bounded,
1822}
1823
1824#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
1825pub enum SingletonBoundKind {
1826    Unbounded,
1827    Monotonic,
1828    Bounded,
1829}
1830
1831#[derive(Clone, PartialEq, Eq, Debug, serde::Serialize)]
1832pub enum CollectionKind {
1833    Stream {
1834        bound: BoundKind,
1835        order: StreamOrder,
1836        retry: StreamRetry,
1837        element_type: DebugType,
1838    },
1839    Singleton {
1840        bound: SingletonBoundKind,
1841        element_type: DebugType,
1842    },
1843    Optional {
1844        bound: BoundKind,
1845        element_type: DebugType,
1846    },
1847    KeyedStream {
1848        bound: BoundKind,
1849        value_order: StreamOrder,
1850        value_retry: StreamRetry,
1851        key_type: DebugType,
1852        value_type: DebugType,
1853    },
1854    KeyedSingleton {
1855        bound: KeyedSingletonBoundKind,
1856        key_type: DebugType,
1857        value_type: DebugType,
1858    },
1859}
1860
1861impl CollectionKind {
1862    pub fn is_bounded(&self) -> bool {
1863        matches!(
1864            self,
1865            CollectionKind::Stream {
1866                bound: BoundKind::Bounded,
1867                ..
1868            } | CollectionKind::Singleton {
1869                bound: SingletonBoundKind::Bounded,
1870                ..
1871            } | CollectionKind::Optional {
1872                bound: BoundKind::Bounded,
1873                ..
1874            } | CollectionKind::KeyedStream {
1875                bound: BoundKind::Bounded,
1876                ..
1877            } | CollectionKind::KeyedSingleton {
1878                bound: KeyedSingletonBoundKind::Bounded,
1879                ..
1880            }
1881        )
1882    }
1883}
1884
1885#[derive(Clone, serde::Serialize)]
1886pub struct HydroIrMetadata {
1887    pub location_id: LocationId,
1888    pub collection_kind: CollectionKind,
1889    pub cardinality: Option<usize>,
1890    pub tag: Option<String>,
1891    pub op: HydroIrOpMetadata,
1892}
1893
1894// HydroIrMetadata shouldn't be used to hash or compare
1895impl Hash for HydroIrMetadata {
1896    fn hash<H: Hasher>(&self, _: &mut H) {}
1897}
1898
1899impl PartialEq for HydroIrMetadata {
1900    fn eq(&self, _: &Self) -> bool {
1901        true
1902    }
1903}
1904
1905impl Eq for HydroIrMetadata {}
1906
1907impl Debug for HydroIrMetadata {
1908    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1909        f.debug_struct("HydroIrMetadata")
1910            .field("location_id", &self.location_id)
1911            .field("collection_kind", &self.collection_kind)
1912            .finish()
1913    }
1914}
1915
1916/// Metadata that is specific to the operator itself, rather than its outputs.
1917/// This is available on _both_ inner nodes and roots.
1918#[derive(Clone, serde::Serialize)]
1919pub struct HydroIrOpMetadata {
1920    #[serde(rename = "span", serialize_with = "serialize_backtrace_as_span")]
1921    pub backtrace: Backtrace,
1922    pub cpu_usage: Option<f64>,
1923    pub network_recv_cpu_usage: Option<f64>,
1924    pub id: Option<usize>,
1925}
1926
1927impl HydroIrOpMetadata {
1928    #[expect(
1929        clippy::new_without_default,
1930        reason = "explicit calls to new ensure correct backtrace bounds"
1931    )]
1932    pub fn new() -> HydroIrOpMetadata {
1933        Self::new_with_skip(1)
1934    }
1935
1936    fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
1937        HydroIrOpMetadata {
1938            backtrace: Backtrace::get_backtrace(2 + skip_count),
1939            cpu_usage: None,
1940            network_recv_cpu_usage: None,
1941            id: None,
1942        }
1943    }
1944}
1945
1946impl Debug for HydroIrOpMetadata {
1947    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1948        f.debug_struct("HydroIrOpMetadata").finish()
1949    }
1950}
1951
1952impl Hash for HydroIrOpMetadata {
1953    fn hash<H: Hasher>(&self, _: &mut H) {}
1954}
1955
1956/// An intermediate node in a Hydro graph, which consumes data
1957/// from upstream nodes and emits data to downstream nodes.
1958#[derive(Debug, Hash, serde::Serialize)]
1959pub enum HydroNode {
1960    Placeholder,
1961
1962    /// Manually "casts" between two different collection kinds.
1963    ///
1964    /// Using this IR node requires special care, since it bypasses many of Hydro's core
1965    /// correctness checks. In particular, the user must ensure that every possible
1966    /// "interpretation" of the input corresponds to a distinct "interpretation" of the output,
1967    /// where an "interpretation" is a possible output of `ObserveNonDet` applied to the
1968    /// collection. This ensures that the simulator does not miss any possible outputs.
1969    Cast {
1970        inner: Box<HydroNode>,
1971        metadata: HydroIrMetadata,
1972    },
1973
1974    /// Strengthens the guarantees of a stream by non-deterministically selecting a possible
1975    /// interpretation of the input stream.
1976    ///
1977    /// In production, this simply passes through the input, but in simulation, this operator
1978    /// explicitly selects a randomized interpretation.
1979    ObserveNonDet {
1980        inner: Box<HydroNode>,
1981        trusted: bool, // if true, we do not need to simulate non-determinism
1982        metadata: HydroIrMetadata,
1983    },
1984
1985    Source {
1986        source: HydroSource,
1987        metadata: HydroIrMetadata,
1988    },
1989
1990    SingletonSource {
1991        value: DebugExpr,
1992        first_tick_only: bool,
1993        metadata: HydroIrMetadata,
1994    },
1995
1996    CycleSource {
1997        cycle_id: CycleId,
1998        metadata: HydroIrMetadata,
1999    },
2000
2001    Tee {
2002        inner: SharedNode,
2003        metadata: HydroIrMetadata,
2004    },
2005
2006    Partition {
2007        inner: SharedNode,
2008        f: DebugExpr,
2009        is_true: bool,
2010        metadata: HydroIrMetadata,
2011    },
2012
2013    BeginAtomic {
2014        inner: Box<HydroNode>,
2015        metadata: HydroIrMetadata,
2016    },
2017
2018    EndAtomic {
2019        inner: Box<HydroNode>,
2020        metadata: HydroIrMetadata,
2021    },
2022
2023    Batch {
2024        inner: Box<HydroNode>,
2025        metadata: HydroIrMetadata,
2026    },
2027
2028    YieldConcat {
2029        inner: Box<HydroNode>,
2030        metadata: HydroIrMetadata,
2031    },
2032
2033    Chain {
2034        first: Box<HydroNode>,
2035        second: Box<HydroNode>,
2036        metadata: HydroIrMetadata,
2037    },
2038
2039    ChainFirst {
2040        first: Box<HydroNode>,
2041        second: Box<HydroNode>,
2042        metadata: HydroIrMetadata,
2043    },
2044
2045    CrossProduct {
2046        left: Box<HydroNode>,
2047        right: Box<HydroNode>,
2048        metadata: HydroIrMetadata,
2049    },
2050
2051    CrossSingleton {
2052        left: Box<HydroNode>,
2053        right: Box<HydroNode>,
2054        metadata: HydroIrMetadata,
2055    },
2056
2057    Join {
2058        left: Box<HydroNode>,
2059        right: Box<HydroNode>,
2060        metadata: HydroIrMetadata,
2061    },
2062
2063    /// Asymmetric join where the right (build) side is bounded.
2064    /// The build side is accumulated (stratum-delayed) into a hash table,
2065    /// then the left (probe) side streams through preserving its ordering.
2066    JoinHalf {
2067        left: Box<HydroNode>,
2068        right: Box<HydroNode>,
2069        metadata: HydroIrMetadata,
2070    },
2071
2072    Difference {
2073        pos: Box<HydroNode>,
2074        neg: Box<HydroNode>,
2075        metadata: HydroIrMetadata,
2076    },
2077
2078    AntiJoin {
2079        pos: Box<HydroNode>,
2080        neg: Box<HydroNode>,
2081        metadata: HydroIrMetadata,
2082    },
2083
2084    ResolveFutures {
2085        input: Box<HydroNode>,
2086        metadata: HydroIrMetadata,
2087    },
2088    ResolveFuturesBlocking {
2089        input: Box<HydroNode>,
2090        metadata: HydroIrMetadata,
2091    },
2092    ResolveFuturesOrdered {
2093        input: Box<HydroNode>,
2094        metadata: HydroIrMetadata,
2095    },
2096
2097    Map {
2098        f: DebugExpr,
2099        input: Box<HydroNode>,
2100        metadata: HydroIrMetadata,
2101    },
2102    FlatMap {
2103        f: DebugExpr,
2104        input: Box<HydroNode>,
2105        metadata: HydroIrMetadata,
2106    },
2107    FlatMapStreamBlocking {
2108        f: DebugExpr,
2109        input: Box<HydroNode>,
2110        metadata: HydroIrMetadata,
2111    },
2112    Filter {
2113        f: DebugExpr,
2114        input: Box<HydroNode>,
2115        metadata: HydroIrMetadata,
2116    },
2117    FilterMap {
2118        f: DebugExpr,
2119        input: Box<HydroNode>,
2120        metadata: HydroIrMetadata,
2121    },
2122
2123    DeferTick {
2124        input: Box<HydroNode>,
2125        metadata: HydroIrMetadata,
2126    },
2127    Enumerate {
2128        input: Box<HydroNode>,
2129        metadata: HydroIrMetadata,
2130    },
2131    Inspect {
2132        f: DebugExpr,
2133        input: Box<HydroNode>,
2134        metadata: HydroIrMetadata,
2135    },
2136
2137    Unique {
2138        input: Box<HydroNode>,
2139        metadata: HydroIrMetadata,
2140    },
2141
2142    Sort {
2143        input: Box<HydroNode>,
2144        metadata: HydroIrMetadata,
2145    },
2146    Fold {
2147        init: DebugExpr,
2148        acc: DebugExpr,
2149        input: Box<HydroNode>,
2150        metadata: HydroIrMetadata,
2151    },
2152
2153    Scan {
2154        init: DebugExpr,
2155        acc: DebugExpr,
2156        input: Box<HydroNode>,
2157        metadata: HydroIrMetadata,
2158    },
2159    ScanAsyncBlocking {
2160        init: DebugExpr,
2161        acc: DebugExpr,
2162        input: Box<HydroNode>,
2163        metadata: HydroIrMetadata,
2164    },
2165    FoldKeyed {
2166        init: DebugExpr,
2167        acc: DebugExpr,
2168        input: Box<HydroNode>,
2169        metadata: HydroIrMetadata,
2170    },
2171
2172    Reduce {
2173        f: DebugExpr,
2174        input: Box<HydroNode>,
2175        metadata: HydroIrMetadata,
2176    },
2177    ReduceKeyed {
2178        f: DebugExpr,
2179        input: Box<HydroNode>,
2180        metadata: HydroIrMetadata,
2181    },
2182    ReduceKeyedWatermark {
2183        f: DebugExpr,
2184        input: Box<HydroNode>,
2185        watermark: Box<HydroNode>,
2186        metadata: HydroIrMetadata,
2187    },
2188
2189    Network {
2190        name: Option<String>,
2191        networking_info: crate::networking::NetworkingInfo,
2192        serialize_fn: Option<DebugExpr>,
2193        instantiate_fn: DebugInstantiate,
2194        deserialize_fn: Option<DebugExpr>,
2195        input: Box<HydroNode>,
2196        metadata: HydroIrMetadata,
2197    },
2198
2199    ExternalInput {
2200        from_external_key: LocationKey,
2201        from_port_id: ExternalPortId,
2202        from_many: bool,
2203        codec_type: DebugType,
2204        #[serde(skip)]
2205        port_hint: NetworkHint,
2206        instantiate_fn: DebugInstantiate,
2207        deserialize_fn: Option<DebugExpr>,
2208        metadata: HydroIrMetadata,
2209    },
2210
2211    Counter {
2212        tag: String,
2213        duration: DebugExpr,
2214        prefix: String,
2215        input: Box<HydroNode>,
2216        metadata: HydroIrMetadata,
2217    },
2218}
2219
2220pub type SeenSharedNodes = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
2221pub type SeenSharedNodeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
2222
2223impl HydroNode {
2224    pub fn transform_bottom_up(
2225        &mut self,
2226        transform: &mut impl FnMut(&mut HydroNode),
2227        seen_tees: &mut SeenSharedNodes,
2228        check_well_formed: bool,
2229    ) {
2230        self.transform_children(
2231            |n, s| n.transform_bottom_up(transform, s, check_well_formed),
2232            seen_tees,
2233        );
2234
2235        transform(self);
2236
2237        let self_location = self.metadata().location_id.root();
2238
2239        if check_well_formed {
2240            match &*self {
2241                HydroNode::Network { .. } => {}
2242                _ => {
2243                    self.input_metadata().iter().for_each(|i| {
2244                        if i.location_id.root() != self_location {
2245                            panic!(
2246                                "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
2247                                i,
2248                                i.location_id.root(),
2249                                self,
2250                                self_location
2251                            )
2252                        }
2253                    });
2254                }
2255            }
2256        }
2257    }
2258
2259    #[inline(always)]
2260    pub fn transform_children(
2261        &mut self,
2262        mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
2263        seen_tees: &mut SeenSharedNodes,
2264    ) {
2265        match self {
2266            HydroNode::Placeholder => {
2267                panic!();
2268            }
2269
2270            HydroNode::Source { .. }
2271            | HydroNode::SingletonSource { .. }
2272            | HydroNode::CycleSource { .. }
2273            | HydroNode::ExternalInput { .. } => {}
2274
2275            HydroNode::Tee { inner, .. } => {
2276                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2277                    *inner = SharedNode(transformed.clone());
2278                } else {
2279                    let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2280                    seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2281                    let mut orig = inner.0.replace(HydroNode::Placeholder);
2282                    transform(&mut orig, seen_tees);
2283                    *transformed_cell.borrow_mut() = orig;
2284                    *inner = SharedNode(transformed_cell);
2285                }
2286            }
2287
2288            HydroNode::Partition { inner, .. } => {
2289                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2290                    *inner = SharedNode(transformed.clone());
2291                } else {
2292                    let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2293                    seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2294                    let mut orig = inner.0.replace(HydroNode::Placeholder);
2295                    transform(&mut orig, seen_tees);
2296                    *transformed_cell.borrow_mut() = orig;
2297                    *inner = SharedNode(transformed_cell);
2298                }
2299            }
2300
2301            HydroNode::Cast { inner, .. }
2302            | HydroNode::ObserveNonDet { inner, .. }
2303            | HydroNode::BeginAtomic { inner, .. }
2304            | HydroNode::EndAtomic { inner, .. }
2305            | HydroNode::Batch { inner, .. }
2306            | HydroNode::YieldConcat { inner, .. } => {
2307                transform(inner.as_mut(), seen_tees);
2308            }
2309
2310            HydroNode::Chain { first, second, .. } => {
2311                transform(first.as_mut(), seen_tees);
2312                transform(second.as_mut(), seen_tees);
2313            }
2314
2315            HydroNode::ChainFirst { first, second, .. } => {
2316                transform(first.as_mut(), seen_tees);
2317                transform(second.as_mut(), seen_tees);
2318            }
2319
2320            HydroNode::CrossSingleton { left, right, .. }
2321            | HydroNode::CrossProduct { left, right, .. }
2322            | HydroNode::Join { left, right, .. }
2323            | HydroNode::JoinHalf { left, right, .. } => {
2324                transform(left.as_mut(), seen_tees);
2325                transform(right.as_mut(), seen_tees);
2326            }
2327
2328            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
2329                transform(pos.as_mut(), seen_tees);
2330                transform(neg.as_mut(), seen_tees);
2331            }
2332
2333            HydroNode::ReduceKeyedWatermark {
2334                input, watermark, ..
2335            } => {
2336                transform(input.as_mut(), seen_tees);
2337                transform(watermark.as_mut(), seen_tees);
2338            }
2339
2340            HydroNode::Map { input, .. }
2341            | HydroNode::ResolveFutures { input, .. }
2342            | HydroNode::ResolveFuturesBlocking { input, .. }
2343            | HydroNode::ResolveFuturesOrdered { input, .. }
2344            | HydroNode::FlatMap { input, .. }
2345            | HydroNode::FlatMapStreamBlocking { input, .. }
2346            | HydroNode::Filter { input, .. }
2347            | HydroNode::FilterMap { input, .. }
2348            | HydroNode::Sort { input, .. }
2349            | HydroNode::DeferTick { input, .. }
2350            | HydroNode::Enumerate { input, .. }
2351            | HydroNode::Inspect { input, .. }
2352            | HydroNode::Unique { input, .. }
2353            | HydroNode::Network { input, .. }
2354            | HydroNode::Fold { input, .. }
2355            | HydroNode::Scan { input, .. }
2356            | HydroNode::ScanAsyncBlocking { input, .. }
2357            | HydroNode::FoldKeyed { input, .. }
2358            | HydroNode::Reduce { input, .. }
2359            | HydroNode::ReduceKeyed { input, .. }
2360            | HydroNode::Counter { input, .. } => {
2361                transform(input.as_mut(), seen_tees);
2362            }
2363        }
2364    }
2365
2366    pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroNode {
2367        match self {
2368            HydroNode::Placeholder => HydroNode::Placeholder,
2369            HydroNode::Cast { inner, metadata } => HydroNode::Cast {
2370                inner: Box::new(inner.deep_clone(seen_tees)),
2371                metadata: metadata.clone(),
2372            },
2373            HydroNode::ObserveNonDet {
2374                inner,
2375                trusted,
2376                metadata,
2377            } => HydroNode::ObserveNonDet {
2378                inner: Box::new(inner.deep_clone(seen_tees)),
2379                trusted: *trusted,
2380                metadata: metadata.clone(),
2381            },
2382            HydroNode::Source { source, metadata } => HydroNode::Source {
2383                source: source.clone(),
2384                metadata: metadata.clone(),
2385            },
2386            HydroNode::SingletonSource {
2387                value,
2388                first_tick_only,
2389                metadata,
2390            } => HydroNode::SingletonSource {
2391                value: value.clone(),
2392                first_tick_only: *first_tick_only,
2393                metadata: metadata.clone(),
2394            },
2395            HydroNode::CycleSource { cycle_id, metadata } => HydroNode::CycleSource {
2396                cycle_id: *cycle_id,
2397                metadata: metadata.clone(),
2398            },
2399            HydroNode::Tee { inner, metadata } => {
2400                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2401                    HydroNode::Tee {
2402                        inner: SharedNode(transformed.clone()),
2403                        metadata: metadata.clone(),
2404                    }
2405                } else {
2406                    let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2407                    seen_tees.insert(inner.as_ptr(), new_rc.clone());
2408                    let cloned = inner.0.borrow().deep_clone(seen_tees);
2409                    *new_rc.borrow_mut() = cloned;
2410                    HydroNode::Tee {
2411                        inner: SharedNode(new_rc),
2412                        metadata: metadata.clone(),
2413                    }
2414                }
2415            }
2416            HydroNode::Partition {
2417                inner,
2418                f,
2419                is_true,
2420                metadata,
2421            } => {
2422                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2423                    HydroNode::Partition {
2424                        inner: SharedNode(transformed.clone()),
2425                        f: f.clone(),
2426                        is_true: *is_true,
2427                        metadata: metadata.clone(),
2428                    }
2429                } else {
2430                    let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2431                    seen_tees.insert(inner.as_ptr(), new_rc.clone());
2432                    let cloned = inner.0.borrow().deep_clone(seen_tees);
2433                    *new_rc.borrow_mut() = cloned;
2434                    HydroNode::Partition {
2435                        inner: SharedNode(new_rc),
2436                        f: f.clone(),
2437                        is_true: *is_true,
2438                        metadata: metadata.clone(),
2439                    }
2440                }
2441            }
2442            HydroNode::YieldConcat { inner, metadata } => HydroNode::YieldConcat {
2443                inner: Box::new(inner.deep_clone(seen_tees)),
2444                metadata: metadata.clone(),
2445            },
2446            HydroNode::BeginAtomic { inner, metadata } => HydroNode::BeginAtomic {
2447                inner: Box::new(inner.deep_clone(seen_tees)),
2448                metadata: metadata.clone(),
2449            },
2450            HydroNode::EndAtomic { inner, metadata } => HydroNode::EndAtomic {
2451                inner: Box::new(inner.deep_clone(seen_tees)),
2452                metadata: metadata.clone(),
2453            },
2454            HydroNode::Batch { inner, metadata } => HydroNode::Batch {
2455                inner: Box::new(inner.deep_clone(seen_tees)),
2456                metadata: metadata.clone(),
2457            },
2458            HydroNode::Chain {
2459                first,
2460                second,
2461                metadata,
2462            } => HydroNode::Chain {
2463                first: Box::new(first.deep_clone(seen_tees)),
2464                second: Box::new(second.deep_clone(seen_tees)),
2465                metadata: metadata.clone(),
2466            },
2467            HydroNode::ChainFirst {
2468                first,
2469                second,
2470                metadata,
2471            } => HydroNode::ChainFirst {
2472                first: Box::new(first.deep_clone(seen_tees)),
2473                second: Box::new(second.deep_clone(seen_tees)),
2474                metadata: metadata.clone(),
2475            },
2476            HydroNode::CrossProduct {
2477                left,
2478                right,
2479                metadata,
2480            } => HydroNode::CrossProduct {
2481                left: Box::new(left.deep_clone(seen_tees)),
2482                right: Box::new(right.deep_clone(seen_tees)),
2483                metadata: metadata.clone(),
2484            },
2485            HydroNode::CrossSingleton {
2486                left,
2487                right,
2488                metadata,
2489            } => HydroNode::CrossSingleton {
2490                left: Box::new(left.deep_clone(seen_tees)),
2491                right: Box::new(right.deep_clone(seen_tees)),
2492                metadata: metadata.clone(),
2493            },
2494            HydroNode::Join {
2495                left,
2496                right,
2497                metadata,
2498            } => HydroNode::Join {
2499                left: Box::new(left.deep_clone(seen_tees)),
2500                right: Box::new(right.deep_clone(seen_tees)),
2501                metadata: metadata.clone(),
2502            },
2503            HydroNode::JoinHalf {
2504                left,
2505                right,
2506                metadata,
2507            } => HydroNode::JoinHalf {
2508                left: Box::new(left.deep_clone(seen_tees)),
2509                right: Box::new(right.deep_clone(seen_tees)),
2510                metadata: metadata.clone(),
2511            },
2512            HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
2513                pos: Box::new(pos.deep_clone(seen_tees)),
2514                neg: Box::new(neg.deep_clone(seen_tees)),
2515                metadata: metadata.clone(),
2516            },
2517            HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
2518                pos: Box::new(pos.deep_clone(seen_tees)),
2519                neg: Box::new(neg.deep_clone(seen_tees)),
2520                metadata: metadata.clone(),
2521            },
2522            HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
2523                input: Box::new(input.deep_clone(seen_tees)),
2524                metadata: metadata.clone(),
2525            },
2526            HydroNode::ResolveFuturesBlocking { input, metadata } => {
2527                HydroNode::ResolveFuturesBlocking {
2528                    input: Box::new(input.deep_clone(seen_tees)),
2529                    metadata: metadata.clone(),
2530                }
2531            }
2532            HydroNode::ResolveFuturesOrdered { input, metadata } => {
2533                HydroNode::ResolveFuturesOrdered {
2534                    input: Box::new(input.deep_clone(seen_tees)),
2535                    metadata: metadata.clone(),
2536                }
2537            }
2538            HydroNode::Map { f, input, metadata } => HydroNode::Map {
2539                f: f.clone(),
2540                input: Box::new(input.deep_clone(seen_tees)),
2541                metadata: metadata.clone(),
2542            },
2543            HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
2544                f: f.clone(),
2545                input: Box::new(input.deep_clone(seen_tees)),
2546                metadata: metadata.clone(),
2547            },
2548            HydroNode::FlatMapStreamBlocking { f, input, metadata } => {
2549                HydroNode::FlatMapStreamBlocking {
2550                    f: f.clone(),
2551                    input: Box::new(input.deep_clone(seen_tees)),
2552                    metadata: metadata.clone(),
2553                }
2554            }
2555            HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
2556                f: f.clone(),
2557                input: Box::new(input.deep_clone(seen_tees)),
2558                metadata: metadata.clone(),
2559            },
2560            HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
2561                f: f.clone(),
2562                input: Box::new(input.deep_clone(seen_tees)),
2563                metadata: metadata.clone(),
2564            },
2565            HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
2566                input: Box::new(input.deep_clone(seen_tees)),
2567                metadata: metadata.clone(),
2568            },
2569            HydroNode::Enumerate { input, metadata } => HydroNode::Enumerate {
2570                input: Box::new(input.deep_clone(seen_tees)),
2571                metadata: metadata.clone(),
2572            },
2573            HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
2574                f: f.clone(),
2575                input: Box::new(input.deep_clone(seen_tees)),
2576                metadata: metadata.clone(),
2577            },
2578            HydroNode::Unique { input, metadata } => HydroNode::Unique {
2579                input: Box::new(input.deep_clone(seen_tees)),
2580                metadata: metadata.clone(),
2581            },
2582            HydroNode::Sort { input, metadata } => HydroNode::Sort {
2583                input: Box::new(input.deep_clone(seen_tees)),
2584                metadata: metadata.clone(),
2585            },
2586            HydroNode::Fold {
2587                init,
2588                acc,
2589                input,
2590                metadata,
2591            } => HydroNode::Fold {
2592                init: init.clone(),
2593                acc: acc.clone(),
2594                input: Box::new(input.deep_clone(seen_tees)),
2595                metadata: metadata.clone(),
2596            },
2597            HydroNode::Scan {
2598                init,
2599                acc,
2600                input,
2601                metadata,
2602            } => HydroNode::Scan {
2603                init: init.clone(),
2604                acc: acc.clone(),
2605                input: Box::new(input.deep_clone(seen_tees)),
2606                metadata: metadata.clone(),
2607            },
2608            HydroNode::ScanAsyncBlocking {
2609                init,
2610                acc,
2611                input,
2612                metadata,
2613            } => HydroNode::ScanAsyncBlocking {
2614                init: init.clone(),
2615                acc: acc.clone(),
2616                input: Box::new(input.deep_clone(seen_tees)),
2617                metadata: metadata.clone(),
2618            },
2619            HydroNode::FoldKeyed {
2620                init,
2621                acc,
2622                input,
2623                metadata,
2624            } => HydroNode::FoldKeyed {
2625                init: init.clone(),
2626                acc: acc.clone(),
2627                input: Box::new(input.deep_clone(seen_tees)),
2628                metadata: metadata.clone(),
2629            },
2630            HydroNode::ReduceKeyedWatermark {
2631                f,
2632                input,
2633                watermark,
2634                metadata,
2635            } => HydroNode::ReduceKeyedWatermark {
2636                f: f.clone(),
2637                input: Box::new(input.deep_clone(seen_tees)),
2638                watermark: Box::new(watermark.deep_clone(seen_tees)),
2639                metadata: metadata.clone(),
2640            },
2641            HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
2642                f: f.clone(),
2643                input: Box::new(input.deep_clone(seen_tees)),
2644                metadata: metadata.clone(),
2645            },
2646            HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
2647                f: f.clone(),
2648                input: Box::new(input.deep_clone(seen_tees)),
2649                metadata: metadata.clone(),
2650            },
2651            HydroNode::Network {
2652                name,
2653                networking_info,
2654                serialize_fn,
2655                instantiate_fn,
2656                deserialize_fn,
2657                input,
2658                metadata,
2659            } => HydroNode::Network {
2660                name: name.clone(),
2661                networking_info: networking_info.clone(),
2662                serialize_fn: serialize_fn.clone(),
2663                instantiate_fn: instantiate_fn.clone(),
2664                deserialize_fn: deserialize_fn.clone(),
2665                input: Box::new(input.deep_clone(seen_tees)),
2666                metadata: metadata.clone(),
2667            },
2668            HydroNode::ExternalInput {
2669                from_external_key,
2670                from_port_id,
2671                from_many,
2672                codec_type,
2673                port_hint,
2674                instantiate_fn,
2675                deserialize_fn,
2676                metadata,
2677            } => HydroNode::ExternalInput {
2678                from_external_key: *from_external_key,
2679                from_port_id: *from_port_id,
2680                from_many: *from_many,
2681                codec_type: codec_type.clone(),
2682                port_hint: *port_hint,
2683                instantiate_fn: instantiate_fn.clone(),
2684                deserialize_fn: deserialize_fn.clone(),
2685                metadata: metadata.clone(),
2686            },
2687            HydroNode::Counter {
2688                tag,
2689                duration,
2690                prefix,
2691                input,
2692                metadata,
2693            } => HydroNode::Counter {
2694                tag: tag.clone(),
2695                duration: duration.clone(),
2696                prefix: prefix.clone(),
2697                input: Box::new(input.deep_clone(seen_tees)),
2698                metadata: metadata.clone(),
2699            },
2700        }
2701    }
2702
2703    #[cfg(feature = "build")]
2704    pub fn emit_core(
2705        &mut self,
2706        builders_or_callback: &mut BuildersOrCallback<
2707            impl FnMut(&mut HydroRoot, &mut usize),
2708            impl FnMut(&mut HydroNode, &mut usize),
2709        >,
2710        seen_tees: &mut SeenSharedNodes,
2711        built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
2712        next_stmt_id: &mut usize,
2713    ) -> syn::Ident {
2714        let mut ident_stack: Vec<syn::Ident> = Vec::new();
2715
2716        self.transform_bottom_up(
2717            &mut |node: &mut HydroNode| {
2718                let out_location = node.metadata().location_id.clone();
2719                match node {
2720                    HydroNode::Placeholder => {
2721                        panic!()
2722                    }
2723
2724                    HydroNode::Cast { .. } => {
2725                        // Cast passes through the input ident unchanged
2726                        // The input ident is already on the stack from processing the child
2727                        match builders_or_callback {
2728                            BuildersOrCallback::Builders(_) => {}
2729                            BuildersOrCallback::Callback(_, node_callback) => {
2730                                node_callback(node, next_stmt_id);
2731                            }
2732                        }
2733
2734                        *next_stmt_id += 1;
2735                        // input_ident stays on stack as output
2736                    }
2737
2738                    HydroNode::ObserveNonDet {
2739                        inner,
2740                        trusted,
2741                        metadata,
2742                        ..
2743                    } => {
2744                        let inner_ident = ident_stack.pop().unwrap();
2745
2746                        let observe_ident =
2747                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2748
2749                        match builders_or_callback {
2750                            BuildersOrCallback::Builders(graph_builders) => {
2751                                graph_builders.observe_nondet(
2752                                    *trusted,
2753                                    &inner.metadata().location_id,
2754                                    inner_ident,
2755                                    &inner.metadata().collection_kind,
2756                                    &observe_ident,
2757                                    &metadata.collection_kind,
2758                                    &metadata.op,
2759                                );
2760                            }
2761                            BuildersOrCallback::Callback(_, node_callback) => {
2762                                node_callback(node, next_stmt_id);
2763                            }
2764                        }
2765
2766                        *next_stmt_id += 1;
2767
2768                        ident_stack.push(observe_ident);
2769                    }
2770
2771                    HydroNode::Batch {
2772                        inner, metadata, ..
2773                    } => {
2774                        let inner_ident = ident_stack.pop().unwrap();
2775
2776                        let batch_ident =
2777                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2778
2779                        match builders_or_callback {
2780                            BuildersOrCallback::Builders(graph_builders) => {
2781                                graph_builders.batch(
2782                                    inner_ident,
2783                                    &inner.metadata().location_id,
2784                                    &inner.metadata().collection_kind,
2785                                    &batch_ident,
2786                                    &out_location,
2787                                    &metadata.op,
2788                                );
2789                            }
2790                            BuildersOrCallback::Callback(_, node_callback) => {
2791                                node_callback(node, next_stmt_id);
2792                            }
2793                        }
2794
2795                        *next_stmt_id += 1;
2796
2797                        ident_stack.push(batch_ident);
2798                    }
2799
2800                    HydroNode::YieldConcat { inner, .. } => {
2801                        let inner_ident = ident_stack.pop().unwrap();
2802
2803                        let yield_ident =
2804                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2805
2806                        match builders_or_callback {
2807                            BuildersOrCallback::Builders(graph_builders) => {
2808                                graph_builders.yield_from_tick(
2809                                    inner_ident,
2810                                    &inner.metadata().location_id,
2811                                    &inner.metadata().collection_kind,
2812                                    &yield_ident,
2813                                    &out_location,
2814                                );
2815                            }
2816                            BuildersOrCallback::Callback(_, node_callback) => {
2817                                node_callback(node, next_stmt_id);
2818                            }
2819                        }
2820
2821                        *next_stmt_id += 1;
2822
2823                        ident_stack.push(yield_ident);
2824                    }
2825
2826                    HydroNode::BeginAtomic { inner, metadata } => {
2827                        let inner_ident = ident_stack.pop().unwrap();
2828
2829                        let begin_ident =
2830                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2831
2832                        match builders_or_callback {
2833                            BuildersOrCallback::Builders(graph_builders) => {
2834                                graph_builders.begin_atomic(
2835                                    inner_ident,
2836                                    &inner.metadata().location_id,
2837                                    &inner.metadata().collection_kind,
2838                                    &begin_ident,
2839                                    &out_location,
2840                                    &metadata.op,
2841                                );
2842                            }
2843                            BuildersOrCallback::Callback(_, node_callback) => {
2844                                node_callback(node, next_stmt_id);
2845                            }
2846                        }
2847
2848                        *next_stmt_id += 1;
2849
2850                        ident_stack.push(begin_ident);
2851                    }
2852
2853                    HydroNode::EndAtomic { inner, .. } => {
2854                        let inner_ident = ident_stack.pop().unwrap();
2855
2856                        let end_ident =
2857                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2858
2859                        match builders_or_callback {
2860                            BuildersOrCallback::Builders(graph_builders) => {
2861                                graph_builders.end_atomic(
2862                                    inner_ident,
2863                                    &inner.metadata().location_id,
2864                                    &inner.metadata().collection_kind,
2865                                    &end_ident,
2866                                );
2867                            }
2868                            BuildersOrCallback::Callback(_, node_callback) => {
2869                                node_callback(node, next_stmt_id);
2870                            }
2871                        }
2872
2873                        *next_stmt_id += 1;
2874
2875                        ident_stack.push(end_ident);
2876                    }
2877
2878                    HydroNode::Source {
2879                        source, metadata, ..
2880                    } => {
2881                        if let HydroSource::ExternalNetwork() = source {
2882                            ident_stack.push(syn::Ident::new("DUMMY", Span::call_site()));
2883                        } else {
2884                            let source_ident =
2885                                syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2886
2887                            let source_stmt = match source {
2888                                HydroSource::Stream(expr) => {
2889                                    debug_assert!(metadata.location_id.is_top_level());
2890                                    parse_quote! {
2891                                        #source_ident = source_stream(#expr);
2892                                    }
2893                                }
2894
2895                                HydroSource::ExternalNetwork() => {
2896                                    unreachable!()
2897                                }
2898
2899                                HydroSource::Iter(expr) => {
2900                                    if metadata.location_id.is_top_level() {
2901                                        parse_quote! {
2902                                            #source_ident = source_iter(#expr);
2903                                        }
2904                                    } else {
2905                                        // TODO(shadaj): a more natural semantics would be to to re-evaluate the expression on each tick
2906                                        parse_quote! {
2907                                            #source_ident = source_iter(#expr) -> persist::<'static>();
2908                                        }
2909                                    }
2910                                }
2911
2912                                HydroSource::Spin() => {
2913                                    debug_assert!(metadata.location_id.is_top_level());
2914                                    parse_quote! {
2915                                        #source_ident = spin();
2916                                    }
2917                                }
2918
2919                                HydroSource::ClusterMembers(target_loc, state) => {
2920                                    debug_assert!(metadata.location_id.is_top_level());
2921
2922                                    let members_tee_ident = syn::Ident::new(
2923                                        &format!(
2924                                            "__cluster_members_tee_{}_{}",
2925                                            metadata.location_id.root().key(),
2926                                            target_loc.key(),
2927                                        ),
2928                                        Span::call_site(),
2929                                    );
2930
2931                                    match state {
2932                                        ClusterMembersState::Stream(d) => {
2933                                            parse_quote! {
2934                                                #members_tee_ident = source_stream(#d) -> tee();
2935                                                #source_ident = #members_tee_ident;
2936                                            }
2937                                        },
2938                                        ClusterMembersState::Uninit => syn::parse_quote! {
2939                                            #source_ident = source_stream(DUMMY);
2940                                        },
2941                                        ClusterMembersState::Tee(..) => parse_quote! {
2942                                            #source_ident = #members_tee_ident;
2943                                        },
2944                                    }
2945                                }
2946
2947                                HydroSource::Embedded(ident) => {
2948                                    parse_quote! {
2949                                        #source_ident = source_stream(#ident);
2950                                    }
2951                                }
2952
2953                                HydroSource::EmbeddedSingleton(ident) => {
2954                                    parse_quote! {
2955                                        #source_ident = source_iter([#ident]);
2956                                    }
2957                                }
2958                            };
2959
2960                            match builders_or_callback {
2961                                BuildersOrCallback::Builders(graph_builders) => {
2962                                    let builder = graph_builders.get_dfir_mut(&out_location);
2963                                    builder.add_dfir(source_stmt, None, Some(&next_stmt_id.to_string()));
2964                                }
2965                                BuildersOrCallback::Callback(_, node_callback) => {
2966                                    node_callback(node, next_stmt_id);
2967                                }
2968                            }
2969
2970                            *next_stmt_id += 1;
2971
2972                            ident_stack.push(source_ident);
2973                        }
2974                    }
2975
2976                    HydroNode::SingletonSource { value, first_tick_only, metadata } => {
2977                        let source_ident =
2978                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2979
2980                        match builders_or_callback {
2981                            BuildersOrCallback::Builders(graph_builders) => {
2982                                let builder = graph_builders.get_dfir_mut(&out_location);
2983
2984                                if *first_tick_only {
2985                                    assert!(
2986                                        !metadata.location_id.is_top_level(),
2987                                        "first_tick_only SingletonSource must be inside a tick"
2988                                    );
2989                                }
2990
2991                                if *first_tick_only
2992                                    || (metadata.location_id.is_top_level()
2993                                        && metadata.collection_kind.is_bounded())
2994                                {
2995                                    builder.add_dfir(
2996                                        parse_quote! {
2997                                            #source_ident = source_iter([#value]);
2998                                        },
2999                                        None,
3000                                        Some(&next_stmt_id.to_string()),
3001                                    );
3002                                } else {
3003                                    builder.add_dfir(
3004                                        parse_quote! {
3005                                            #source_ident = source_iter([#value]) -> persist::<'static>();
3006                                        },
3007                                        None,
3008                                        Some(&next_stmt_id.to_string()),
3009                                    );
3010                                }
3011                            }
3012                            BuildersOrCallback::Callback(_, node_callback) => {
3013                                node_callback(node, next_stmt_id);
3014                            }
3015                        }
3016
3017                        *next_stmt_id += 1;
3018
3019                        ident_stack.push(source_ident);
3020                    }
3021
3022                    HydroNode::CycleSource { cycle_id, .. } => {
3023                        let ident = cycle_id.as_ident();
3024
3025                        match builders_or_callback {
3026                            BuildersOrCallback::Builders(_) => {}
3027                            BuildersOrCallback::Callback(_, node_callback) => {
3028                                node_callback(node, next_stmt_id);
3029                            }
3030                        }
3031
3032                        // consume a stmt id even though we did not emit anything so that we can instrument this
3033                        *next_stmt_id += 1;
3034
3035                        ident_stack.push(ident);
3036                    }
3037
3038                    HydroNode::Tee { inner, .. } => {
3039                        let ret_ident = if let Some(built_idents) =
3040                            built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
3041                        {
3042                            match builders_or_callback {
3043                                BuildersOrCallback::Builders(_) => {}
3044                                BuildersOrCallback::Callback(_, node_callback) => {
3045                                    node_callback(node, next_stmt_id);
3046                                }
3047                            }
3048
3049                            built_idents[0].clone()
3050                        } else {
3051                            // The inner node was already processed by transform_bottom_up,
3052                            // so its ident is on the stack
3053                            let inner_ident = ident_stack.pop().unwrap();
3054
3055                            let tee_ident =
3056                                syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3057
3058                            built_tees.insert(
3059                                inner.0.as_ref() as *const RefCell<HydroNode>,
3060                                vec![tee_ident.clone()],
3061                            );
3062
3063                            match builders_or_callback {
3064                                BuildersOrCallback::Builders(graph_builders) => {
3065                                    let builder = graph_builders.get_dfir_mut(&out_location);
3066                                    builder.add_dfir(
3067                                        parse_quote! {
3068                                            #tee_ident = #inner_ident -> tee();
3069                                        },
3070                                        None,
3071                                        Some(&next_stmt_id.to_string()),
3072                                    );
3073                                }
3074                                BuildersOrCallback::Callback(_, node_callback) => {
3075                                    node_callback(node, next_stmt_id);
3076                                }
3077                            }
3078
3079                            tee_ident
3080                        };
3081
3082                        // we consume a stmt id regardless of if we emit the tee() operator,
3083                        // so that during rewrites we touch all recipients of the tee()
3084
3085                        *next_stmt_id += 1;
3086                        ident_stack.push(ret_ident);
3087                    }
3088
3089                    HydroNode::Partition {
3090                        inner, f, is_true, ..
3091                    } => {
3092                        let is_true = *is_true; // need to copy early to avoid borrow checking issues with node
3093                        let ptr = inner.0.as_ref() as *const RefCell<HydroNode>;
3094                        let ret_ident = if let Some(built_idents) = built_tees.get(&ptr) {
3095                            match builders_or_callback {
3096                                BuildersOrCallback::Builders(_) => {}
3097                                BuildersOrCallback::Callback(_, node_callback) => {
3098                                    node_callback(node, next_stmt_id);
3099                                }
3100                            }
3101
3102                            let idx = if is_true { 0 } else { 1 };
3103                            built_idents[idx].clone()
3104                        } else {
3105                            // The inner node was already processed by transform_bottom_up,
3106                            // so its ident is on the stack
3107                            let inner_ident = ident_stack.pop().unwrap();
3108
3109                            let partition_ident = syn::Ident::new(
3110                                &format!("stream_{}_partition", *next_stmt_id),
3111                                Span::call_site(),
3112                            );
3113                            let true_ident = syn::Ident::new(
3114                                &format!("stream_{}_true", *next_stmt_id),
3115                                Span::call_site(),
3116                            );
3117                            let false_ident = syn::Ident::new(
3118                                &format!("stream_{}_false", *next_stmt_id),
3119                                Span::call_site(),
3120                            );
3121
3122                            built_tees.insert(
3123                                ptr,
3124                                vec![true_ident.clone(), false_ident.clone()],
3125                            );
3126
3127                            match builders_or_callback {
3128                                BuildersOrCallback::Builders(graph_builders) => {
3129                                    let builder = graph_builders.get_dfir_mut(&out_location);
3130                                    builder.add_dfir(
3131                                        parse_quote! {
3132                                            #partition_ident = #inner_ident -> partition(|__item, __num_outputs| if (#f)(__item) { 0_usize } else { 1_usize });
3133                                            #true_ident = #partition_ident[0];
3134                                            #false_ident = #partition_ident[1];
3135                                        },
3136                                        None,
3137                                        Some(&next_stmt_id.to_string()),
3138                                    );
3139                                }
3140                                BuildersOrCallback::Callback(_, node_callback) => {
3141                                    node_callback(node, next_stmt_id);
3142                                }
3143                            }
3144
3145                            if is_true { true_ident } else { false_ident }
3146                        };
3147
3148                        *next_stmt_id += 1;
3149                        ident_stack.push(ret_ident);
3150                    }
3151
3152                    HydroNode::Chain { .. } => {
3153                        // Children are processed left-to-right, so second is on top
3154                        let second_ident = ident_stack.pop().unwrap();
3155                        let first_ident = ident_stack.pop().unwrap();
3156
3157                        let chain_ident =
3158                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3159
3160                        match builders_or_callback {
3161                            BuildersOrCallback::Builders(graph_builders) => {
3162                                let builder = graph_builders.get_dfir_mut(&out_location);
3163                                builder.add_dfir(
3164                                    parse_quote! {
3165                                        #chain_ident = chain();
3166                                        #first_ident -> [0]#chain_ident;
3167                                        #second_ident -> [1]#chain_ident;
3168                                    },
3169                                    None,
3170                                    Some(&next_stmt_id.to_string()),
3171                                );
3172                            }
3173                            BuildersOrCallback::Callback(_, node_callback) => {
3174                                node_callback(node, next_stmt_id);
3175                            }
3176                        }
3177
3178                        *next_stmt_id += 1;
3179
3180                        ident_stack.push(chain_ident);
3181                    }
3182
3183                    HydroNode::ChainFirst { .. } => {
3184                        let second_ident = ident_stack.pop().unwrap();
3185                        let first_ident = ident_stack.pop().unwrap();
3186
3187                        let chain_ident =
3188                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3189
3190                        match builders_or_callback {
3191                            BuildersOrCallback::Builders(graph_builders) => {
3192                                let builder = graph_builders.get_dfir_mut(&out_location);
3193                                builder.add_dfir(
3194                                    parse_quote! {
3195                                        #chain_ident = chain_first_n(1);
3196                                        #first_ident -> [0]#chain_ident;
3197                                        #second_ident -> [1]#chain_ident;
3198                                    },
3199                                    None,
3200                                    Some(&next_stmt_id.to_string()),
3201                                );
3202                            }
3203                            BuildersOrCallback::Callback(_, node_callback) => {
3204                                node_callback(node, next_stmt_id);
3205                            }
3206                        }
3207
3208                        *next_stmt_id += 1;
3209
3210                        ident_stack.push(chain_ident);
3211                    }
3212
3213                    HydroNode::CrossSingleton { right, .. } => {
3214                        let right_ident = ident_stack.pop().unwrap();
3215                        let left_ident = ident_stack.pop().unwrap();
3216
3217                        let cross_ident =
3218                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3219
3220                        match builders_or_callback {
3221                            BuildersOrCallback::Builders(graph_builders) => {
3222                                let builder = graph_builders.get_dfir_mut(&out_location);
3223
3224                                if right.metadata().location_id.is_top_level()
3225                                    && right.metadata().collection_kind.is_bounded()
3226                                {
3227                                    builder.add_dfir(
3228                                        parse_quote! {
3229                                            #cross_ident = cross_singleton();
3230                                            #left_ident -> [input]#cross_ident;
3231                                            #right_ident -> persist::<'static>() -> [single]#cross_ident;
3232                                        },
3233                                        None,
3234                                        Some(&next_stmt_id.to_string()),
3235                                    );
3236                                } else {
3237                                    builder.add_dfir(
3238                                        parse_quote! {
3239                                            #cross_ident = cross_singleton();
3240                                            #left_ident -> [input]#cross_ident;
3241                                            #right_ident -> [single]#cross_ident;
3242                                        },
3243                                        None,
3244                                        Some(&next_stmt_id.to_string()),
3245                                    );
3246                                }
3247                            }
3248                            BuildersOrCallback::Callback(_, node_callback) => {
3249                                node_callback(node, next_stmt_id);
3250                            }
3251                        }
3252
3253                        *next_stmt_id += 1;
3254
3255                        ident_stack.push(cross_ident);
3256                    }
3257
3258                    HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
3259                        let operator: syn::Ident = if matches!(node, HydroNode::CrossProduct { .. }) {
3260                            parse_quote!(cross_join_multiset)
3261                        } else {
3262                            parse_quote!(join_multiset)
3263                        };
3264
3265                        let (HydroNode::CrossProduct { left, right, .. }
3266                        | HydroNode::Join { left, right, .. }) = node
3267                        else {
3268                            unreachable!()
3269                        };
3270
3271                        let is_top_level = left.metadata().location_id.is_top_level()
3272                            && right.metadata().location_id.is_top_level();
3273                        let left_lifetime = if left.metadata().location_id.is_top_level() {
3274                            quote!('static)
3275                        } else {
3276                            quote!('tick)
3277                        };
3278
3279                        let right_lifetime = if right.metadata().location_id.is_top_level() {
3280                            quote!('static)
3281                        } else {
3282                            quote!('tick)
3283                        };
3284
3285                        let right_ident = ident_stack.pop().unwrap();
3286                        let left_ident = ident_stack.pop().unwrap();
3287
3288                        let stream_ident =
3289                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3290
3291                        match builders_or_callback {
3292                            BuildersOrCallback::Builders(graph_builders) => {
3293                                let builder = graph_builders.get_dfir_mut(&out_location);
3294                                builder.add_dfir(
3295                                    if is_top_level {
3296                                        // if both inputs are root, the output is expected to have streamy semantics, so we need
3297                                        // a multiset_delta() to negate the replay behavior
3298                                        parse_quote! {
3299                                            #stream_ident = #operator::<#left_lifetime, #right_lifetime>() -> multiset_delta();
3300                                            #left_ident -> [0]#stream_ident;
3301                                            #right_ident -> [1]#stream_ident;
3302                                        }
3303                                    } else {
3304                                        parse_quote! {
3305                                            #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
3306                                            #left_ident -> [0]#stream_ident;
3307                                            #right_ident -> [1]#stream_ident;
3308                                        }
3309                                    }
3310                                    ,
3311                                    None,
3312                                    Some(&next_stmt_id.to_string()),
3313                                );
3314                            }
3315                            BuildersOrCallback::Callback(_, node_callback) => {
3316                                node_callback(node, next_stmt_id);
3317                            }
3318                        }
3319
3320                        *next_stmt_id += 1;
3321
3322                        ident_stack.push(stream_ident);
3323                    }
3324
3325                    HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
3326                        let operator: syn::Ident = if matches!(node, HydroNode::Difference { .. }) {
3327                            parse_quote!(difference)
3328                        } else {
3329                            parse_quote!(anti_join)
3330                        };
3331
3332                        let (HydroNode::Difference { neg, .. } | HydroNode::AntiJoin { neg, .. }) =
3333                            node
3334                        else {
3335                            unreachable!()
3336                        };
3337
3338                        let neg_lifetime = if neg.metadata().location_id.is_top_level() {
3339                            quote!('static)
3340                        } else {
3341                            quote!('tick)
3342                        };
3343
3344                        let neg_ident = ident_stack.pop().unwrap();
3345                        let pos_ident = ident_stack.pop().unwrap();
3346
3347                        let stream_ident =
3348                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3349
3350                        match builders_or_callback {
3351                            BuildersOrCallback::Builders(graph_builders) => {
3352                                let builder = graph_builders.get_dfir_mut(&out_location);
3353                                builder.add_dfir(
3354                                    parse_quote! {
3355                                        #stream_ident = #operator::<'tick, #neg_lifetime>();
3356                                        #pos_ident -> [pos]#stream_ident;
3357                                        #neg_ident -> [neg]#stream_ident;
3358                                    },
3359                                    None,
3360                                    Some(&next_stmt_id.to_string()),
3361                                );
3362                            }
3363                            BuildersOrCallback::Callback(_, node_callback) => {
3364                                node_callback(node, next_stmt_id);
3365                            }
3366                        }
3367
3368                        *next_stmt_id += 1;
3369
3370                        ident_stack.push(stream_ident);
3371                    }
3372
3373                    HydroNode::JoinHalf { .. } => {
3374                        let HydroNode::JoinHalf { right, .. } = node else {
3375                            unreachable!()
3376                        };
3377
3378                        assert!(
3379                            right.metadata().collection_kind.is_bounded(),
3380                            "JoinHalf requires the right (build) side to be Bounded, got {:?}",
3381                            right.metadata().collection_kind
3382                        );
3383
3384                        let build_lifetime = if right.metadata().location_id.is_top_level() {
3385                            quote!('static)
3386                        } else {
3387                            quote!('tick)
3388                        };
3389
3390                        let build_ident = ident_stack.pop().unwrap();
3391                        let probe_ident = ident_stack.pop().unwrap();
3392
3393                        let stream_ident =
3394                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3395
3396                        match builders_or_callback {
3397                            BuildersOrCallback::Builders(graph_builders) => {
3398                                let builder = graph_builders.get_dfir_mut(&out_location);
3399                                builder.add_dfir(
3400                                    parse_quote! {
3401                                        #stream_ident = join_multiset_half::<#build_lifetime, 'tick>();
3402                                        #probe_ident -> [probe]#stream_ident;
3403                                        #build_ident -> [build]#stream_ident;
3404                                    },
3405                                    None,
3406                                    Some(&next_stmt_id.to_string()),
3407                                );
3408                            }
3409                            BuildersOrCallback::Callback(_, node_callback) => {
3410                                node_callback(node, next_stmt_id);
3411                            }
3412                        }
3413
3414                        *next_stmt_id += 1;
3415
3416                        ident_stack.push(stream_ident);
3417                    }
3418
3419                    HydroNode::ResolveFutures { .. } => {
3420                        let input_ident = ident_stack.pop().unwrap();
3421
3422                        let futures_ident =
3423                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3424
3425                        match builders_or_callback {
3426                            BuildersOrCallback::Builders(graph_builders) => {
3427                                let builder = graph_builders.get_dfir_mut(&out_location);
3428                                builder.add_dfir(
3429                                    parse_quote! {
3430                                        #futures_ident = #input_ident -> resolve_futures();
3431                                    },
3432                                    None,
3433                                    Some(&next_stmt_id.to_string()),
3434                                );
3435                            }
3436                            BuildersOrCallback::Callback(_, node_callback) => {
3437                                node_callback(node, next_stmt_id);
3438                            }
3439                        }
3440
3441                        *next_stmt_id += 1;
3442
3443                        ident_stack.push(futures_ident);
3444                    }
3445
3446                    HydroNode::ResolveFuturesBlocking { .. } => {
3447                        let input_ident = ident_stack.pop().unwrap();
3448
3449                        let futures_ident =
3450                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3451
3452                        match builders_or_callback {
3453                            BuildersOrCallback::Builders(graph_builders) => {
3454                                let builder = graph_builders.get_dfir_mut(&out_location);
3455                                builder.add_dfir(
3456                                    parse_quote! {
3457                                        #futures_ident = #input_ident -> resolve_futures_blocking();
3458                                    },
3459                                    None,
3460                                    Some(&next_stmt_id.to_string()),
3461                                );
3462                            }
3463                            BuildersOrCallback::Callback(_, node_callback) => {
3464                                node_callback(node, next_stmt_id);
3465                            }
3466                        }
3467
3468                        *next_stmt_id += 1;
3469
3470                        ident_stack.push(futures_ident);
3471                    }
3472
3473                    HydroNode::ResolveFuturesOrdered { .. } => {
3474                        let input_ident = ident_stack.pop().unwrap();
3475
3476                        let futures_ident =
3477                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3478
3479                        match builders_or_callback {
3480                            BuildersOrCallback::Builders(graph_builders) => {
3481                                let builder = graph_builders.get_dfir_mut(&out_location);
3482                                builder.add_dfir(
3483                                    parse_quote! {
3484                                        #futures_ident = #input_ident -> resolve_futures_ordered();
3485                                    },
3486                                    None,
3487                                    Some(&next_stmt_id.to_string()),
3488                                );
3489                            }
3490                            BuildersOrCallback::Callback(_, node_callback) => {
3491                                node_callback(node, next_stmt_id);
3492                            }
3493                        }
3494
3495                        *next_stmt_id += 1;
3496
3497                        ident_stack.push(futures_ident);
3498                    }
3499
3500                    HydroNode::Map { f, .. } => {
3501                        let input_ident = ident_stack.pop().unwrap();
3502
3503                        let map_ident =
3504                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3505
3506                        match builders_or_callback {
3507                            BuildersOrCallback::Builders(graph_builders) => {
3508                                let builder = graph_builders.get_dfir_mut(&out_location);
3509                                builder.add_dfir(
3510                                    parse_quote! {
3511                                        #map_ident = #input_ident -> map(#f);
3512                                    },
3513                                    None,
3514                                    Some(&next_stmt_id.to_string()),
3515                                );
3516                            }
3517                            BuildersOrCallback::Callback(_, node_callback) => {
3518                                node_callback(node, next_stmt_id);
3519                            }
3520                        }
3521
3522                        *next_stmt_id += 1;
3523
3524                        ident_stack.push(map_ident);
3525                    }
3526
3527                    HydroNode::FlatMap { f, .. } => {
3528                        let input_ident = ident_stack.pop().unwrap();
3529
3530                        let flat_map_ident =
3531                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3532
3533                        match builders_or_callback {
3534                            BuildersOrCallback::Builders(graph_builders) => {
3535                                let builder = graph_builders.get_dfir_mut(&out_location);
3536                                builder.add_dfir(
3537                                    parse_quote! {
3538                                        #flat_map_ident = #input_ident -> flat_map(#f);
3539                                    },
3540                                    None,
3541                                    Some(&next_stmt_id.to_string()),
3542                                );
3543                            }
3544                            BuildersOrCallback::Callback(_, node_callback) => {
3545                                node_callback(node, next_stmt_id);
3546                            }
3547                        }
3548
3549                        *next_stmt_id += 1;
3550
3551                        ident_stack.push(flat_map_ident);
3552                    }
3553
3554                    HydroNode::FlatMapStreamBlocking { f, .. } => {
3555                        let input_ident = ident_stack.pop().unwrap();
3556
3557                        let flat_map_stream_blocking_ident =
3558                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3559
3560                        match builders_or_callback {
3561                            BuildersOrCallback::Builders(graph_builders) => {
3562                                let builder = graph_builders.get_dfir_mut(&out_location);
3563                                builder.add_dfir(
3564                                    parse_quote! {
3565                                        #flat_map_stream_blocking_ident = #input_ident -> flat_map_stream_blocking(#f);
3566                                    },
3567                                    None,
3568                                    Some(&next_stmt_id.to_string()),
3569                                );
3570                            }
3571                            BuildersOrCallback::Callback(_, node_callback) => {
3572                                node_callback(node, next_stmt_id);
3573                            }
3574                        }
3575
3576                        *next_stmt_id += 1;
3577
3578                        ident_stack.push(flat_map_stream_blocking_ident);
3579                    }
3580
3581                    HydroNode::Filter { f, .. } => {
3582                        let input_ident = ident_stack.pop().unwrap();
3583
3584                        let filter_ident =
3585                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3586
3587                        match builders_or_callback {
3588                            BuildersOrCallback::Builders(graph_builders) => {
3589                                let builder = graph_builders.get_dfir_mut(&out_location);
3590                                builder.add_dfir(
3591                                    parse_quote! {
3592                                        #filter_ident = #input_ident -> filter(#f);
3593                                    },
3594                                    None,
3595                                    Some(&next_stmt_id.to_string()),
3596                                );
3597                            }
3598                            BuildersOrCallback::Callback(_, node_callback) => {
3599                                node_callback(node, next_stmt_id);
3600                            }
3601                        }
3602
3603                        *next_stmt_id += 1;
3604
3605                        ident_stack.push(filter_ident);
3606                    }
3607
3608                    HydroNode::FilterMap { f, .. } => {
3609                        let input_ident = ident_stack.pop().unwrap();
3610
3611                        let filter_map_ident =
3612                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3613
3614                        match builders_or_callback {
3615                            BuildersOrCallback::Builders(graph_builders) => {
3616                                let builder = graph_builders.get_dfir_mut(&out_location);
3617                                builder.add_dfir(
3618                                    parse_quote! {
3619                                        #filter_map_ident = #input_ident -> filter_map(#f);
3620                                    },
3621                                    None,
3622                                    Some(&next_stmt_id.to_string()),
3623                                );
3624                            }
3625                            BuildersOrCallback::Callback(_, node_callback) => {
3626                                node_callback(node, next_stmt_id);
3627                            }
3628                        }
3629
3630                        *next_stmt_id += 1;
3631
3632                        ident_stack.push(filter_map_ident);
3633                    }
3634
3635                    HydroNode::Sort { .. } => {
3636                        let input_ident = ident_stack.pop().unwrap();
3637
3638                        let sort_ident =
3639                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3640
3641                        match builders_or_callback {
3642                            BuildersOrCallback::Builders(graph_builders) => {
3643                                let builder = graph_builders.get_dfir_mut(&out_location);
3644                                builder.add_dfir(
3645                                    parse_quote! {
3646                                        #sort_ident = #input_ident -> sort();
3647                                    },
3648                                    None,
3649                                    Some(&next_stmt_id.to_string()),
3650                                );
3651                            }
3652                            BuildersOrCallback::Callback(_, node_callback) => {
3653                                node_callback(node, next_stmt_id);
3654                            }
3655                        }
3656
3657                        *next_stmt_id += 1;
3658
3659                        ident_stack.push(sort_ident);
3660                    }
3661
3662                    HydroNode::DeferTick { .. } => {
3663                        let input_ident = ident_stack.pop().unwrap();
3664
3665                        let defer_tick_ident =
3666                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3667
3668                        match builders_or_callback {
3669                            BuildersOrCallback::Builders(graph_builders) => {
3670                                let builder = graph_builders.get_dfir_mut(&out_location);
3671                                builder.add_dfir(
3672                                    parse_quote! {
3673                                        #defer_tick_ident = #input_ident -> defer_tick_lazy();
3674                                    },
3675                                    None,
3676                                    Some(&next_stmt_id.to_string()),
3677                                );
3678                            }
3679                            BuildersOrCallback::Callback(_, node_callback) => {
3680                                node_callback(node, next_stmt_id);
3681                            }
3682                        }
3683
3684                        *next_stmt_id += 1;
3685
3686                        ident_stack.push(defer_tick_ident);
3687                    }
3688
3689                    HydroNode::Enumerate { input, .. } => {
3690                        let input_ident = ident_stack.pop().unwrap();
3691
3692                        let enumerate_ident =
3693                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3694
3695                        match builders_or_callback {
3696                            BuildersOrCallback::Builders(graph_builders) => {
3697                                let builder = graph_builders.get_dfir_mut(&out_location);
3698                                let lifetime = if input.metadata().location_id.is_top_level() {
3699                                    quote!('static)
3700                                } else {
3701                                    quote!('tick)
3702                                };
3703                                builder.add_dfir(
3704                                    parse_quote! {
3705                                        #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
3706                                    },
3707                                    None,
3708                                    Some(&next_stmt_id.to_string()),
3709                                );
3710                            }
3711                            BuildersOrCallback::Callback(_, node_callback) => {
3712                                node_callback(node, next_stmt_id);
3713                            }
3714                        }
3715
3716                        *next_stmt_id += 1;
3717
3718                        ident_stack.push(enumerate_ident);
3719                    }
3720
3721                    HydroNode::Inspect { f, .. } => {
3722                        let input_ident = ident_stack.pop().unwrap();
3723
3724                        let inspect_ident =
3725                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3726
3727                        match builders_or_callback {
3728                            BuildersOrCallback::Builders(graph_builders) => {
3729                                let builder = graph_builders.get_dfir_mut(&out_location);
3730                                builder.add_dfir(
3731                                    parse_quote! {
3732                                        #inspect_ident = #input_ident -> inspect(#f);
3733                                    },
3734                                    None,
3735                                    Some(&next_stmt_id.to_string()),
3736                                );
3737                            }
3738                            BuildersOrCallback::Callback(_, node_callback) => {
3739                                node_callback(node, next_stmt_id);
3740                            }
3741                        }
3742
3743                        *next_stmt_id += 1;
3744
3745                        ident_stack.push(inspect_ident);
3746                    }
3747
3748                    HydroNode::Unique { input, .. } => {
3749                        let input_ident = ident_stack.pop().unwrap();
3750
3751                        let unique_ident =
3752                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3753
3754                        match builders_or_callback {
3755                            BuildersOrCallback::Builders(graph_builders) => {
3756                                let builder = graph_builders.get_dfir_mut(&out_location);
3757                                let lifetime = if input.metadata().location_id.is_top_level() {
3758                                    quote!('static)
3759                                } else {
3760                                    quote!('tick)
3761                                };
3762
3763                                builder.add_dfir(
3764                                    parse_quote! {
3765                                        #unique_ident = #input_ident -> unique::<#lifetime>();
3766                                    },
3767                                    None,
3768                                    Some(&next_stmt_id.to_string()),
3769                                );
3770                            }
3771                            BuildersOrCallback::Callback(_, node_callback) => {
3772                                node_callback(node, next_stmt_id);
3773                            }
3774                        }
3775
3776                        *next_stmt_id += 1;
3777
3778                        ident_stack.push(unique_ident);
3779                    }
3780
3781                    HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } | HydroNode::ScanAsyncBlocking { .. } => {
3782                        let operator: syn::Ident = if let HydroNode::Fold { input, .. } = node {
3783                            if input.metadata().location_id.is_top_level()
3784                                && input.metadata().collection_kind.is_bounded()
3785                            {
3786                                parse_quote!(fold_no_replay)
3787                            } else {
3788                                parse_quote!(fold)
3789                            }
3790                        } else if matches!(node, HydroNode::Scan { .. }) {
3791                            parse_quote!(scan)
3792                        } else if matches!(node, HydroNode::ScanAsyncBlocking { .. }) {
3793                            parse_quote!(scan_async_blocking)
3794                        } else if let HydroNode::FoldKeyed { input, .. } = node {
3795                            if input.metadata().location_id.is_top_level()
3796                                && input.metadata().collection_kind.is_bounded()
3797                            {
3798                                todo!("Fold keyed on a top-level bounded collection is not yet supported")
3799                            } else {
3800                                parse_quote!(fold_keyed)
3801                            }
3802                        } else {
3803                            unreachable!()
3804                        };
3805
3806                        let (HydroNode::Fold { input, .. }
3807                        | HydroNode::FoldKeyed { input, .. }
3808                        | HydroNode::Scan { input, .. }
3809                        | HydroNode::ScanAsyncBlocking { input, .. }) = node
3810                        else {
3811                            unreachable!()
3812                        };
3813
3814                        let lifetime = if input.metadata().location_id.is_top_level() {
3815                            quote!('static)
3816                        } else {
3817                            quote!('tick)
3818                        };
3819
3820                        let input_ident = ident_stack.pop().unwrap();
3821
3822                        let (HydroNode::Fold { init, acc, .. }
3823                        | HydroNode::FoldKeyed { init, acc, .. }
3824                        | HydroNode::Scan { init, acc, .. }
3825                        | HydroNode::ScanAsyncBlocking { init, acc, .. }) = &*node
3826                        else {
3827                            unreachable!()
3828                        };
3829
3830                        let fold_ident =
3831                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3832
3833                        match builders_or_callback {
3834                            BuildersOrCallback::Builders(graph_builders) => {
3835                                if matches!(node, HydroNode::Fold { .. })
3836                                    && node.metadata().location_id.is_top_level()
3837                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3838                                    && graph_builders.singleton_intermediates()
3839                                    && !node.metadata().collection_kind.is_bounded()
3840                                {
3841                                    let builder = graph_builders.get_dfir_mut(&out_location);
3842
3843                                    let acc: syn::Expr = parse_quote!({
3844                                        let mut __inner = #acc;
3845                                        move |__state, __value| {
3846                                            __inner(__state, __value);
3847                                            Some(__state.clone())
3848                                        }
3849                                    });
3850
3851                                    builder.add_dfir(
3852                                        parse_quote! {
3853                                            source_iter([(#init)()]) -> [0]#fold_ident;
3854                                            #input_ident -> scan::<#lifetime>(#init, #acc) -> [1]#fold_ident;
3855                                            #fold_ident = chain();
3856                                        },
3857                                        None,
3858                                        Some(&next_stmt_id.to_string()),
3859                                    );
3860                                } else if matches!(node, HydroNode::FoldKeyed { .. })
3861                                    && node.metadata().location_id.is_top_level()
3862                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3863                                    && graph_builders.singleton_intermediates()
3864                                    && !node.metadata().collection_kind.is_bounded()
3865                                {
3866                                    let builder = graph_builders.get_dfir_mut(&out_location);
3867
3868                                    let acc: syn::Expr = parse_quote!({
3869                                        let mut __init = #init;
3870                                        let mut __inner = #acc;
3871                                        move |__state, __kv: (_, _)| {
3872                                            // TODO(shadaj): we can avoid the clone when the entry exists
3873                                            let __state = __state
3874                                                .entry(::std::clone::Clone::clone(&__kv.0))
3875                                                .or_insert_with(|| (__init)());
3876                                            __inner(__state, __kv.1);
3877                                            Some((__kv.0, ::std::clone::Clone::clone(&*__state)))
3878                                        }
3879                                    });
3880
3881                                    builder.add_dfir(
3882                                        parse_quote! {
3883                                            #fold_ident = #input_ident -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #acc);
3884                                        },
3885                                        None,
3886                                        Some(&next_stmt_id.to_string()),
3887                                    );
3888                                } else {
3889                                    let builder = graph_builders.get_dfir_mut(&out_location);
3890                                    builder.add_dfir(
3891                                        parse_quote! {
3892                                            #fold_ident = #input_ident -> #operator::<#lifetime>(#init, #acc);
3893                                        },
3894                                        None,
3895                                        Some(&next_stmt_id.to_string()),
3896                                    );
3897                                }
3898                            }
3899                            BuildersOrCallback::Callback(_, node_callback) => {
3900                                node_callback(node, next_stmt_id);
3901                            }
3902                        }
3903
3904                        *next_stmt_id += 1;
3905
3906                        ident_stack.push(fold_ident);
3907                    }
3908
3909                    HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
3910                        let operator: syn::Ident = if let HydroNode::Reduce { input, .. } = node {
3911                            if input.metadata().location_id.is_top_level()
3912                                && input.metadata().collection_kind.is_bounded()
3913                            {
3914                                parse_quote!(reduce_no_replay)
3915                            } else {
3916                                parse_quote!(reduce)
3917                            }
3918                        } else if let HydroNode::ReduceKeyed { input, .. } = node {
3919                            if input.metadata().location_id.is_top_level()
3920                                && input.metadata().collection_kind.is_bounded()
3921                            {
3922                                todo!(
3923                                    "Calling keyed reduce on a top-level bounded collection is not supported"
3924                                )
3925                            } else {
3926                                parse_quote!(reduce_keyed)
3927                            }
3928                        } else {
3929                            unreachable!()
3930                        };
3931
3932                        let (HydroNode::Reduce { input, .. } | HydroNode::ReduceKeyed { input, .. }) = node
3933                        else {
3934                            unreachable!()
3935                        };
3936
3937                        let lifetime = if input.metadata().location_id.is_top_level() {
3938                            quote!('static)
3939                        } else {
3940                            quote!('tick)
3941                        };
3942
3943                        let input_ident = ident_stack.pop().unwrap();
3944
3945                        let (HydroNode::Reduce { f, .. } | HydroNode::ReduceKeyed { f, .. }) = &*node
3946                        else {
3947                            unreachable!()
3948                        };
3949
3950                        let reduce_ident =
3951                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3952
3953                        match builders_or_callback {
3954                            BuildersOrCallback::Builders(graph_builders) => {
3955                                if matches!(node, HydroNode::Reduce { .. })
3956                                    && node.metadata().location_id.is_top_level()
3957                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3958                                    && graph_builders.singleton_intermediates()
3959                                    && !node.metadata().collection_kind.is_bounded()
3960                                {
3961                                    todo!(
3962                                        "Reduce with optional intermediates is not yet supported in simulator"
3963                                    );
3964                                } else if matches!(node, HydroNode::ReduceKeyed { .. })
3965                                    && node.metadata().location_id.is_top_level()
3966                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3967                                    && graph_builders.singleton_intermediates()
3968                                    && !node.metadata().collection_kind.is_bounded()
3969                                {
3970                                    todo!(
3971                                        "Reduce keyed with optional intermediates is not yet supported in simulator"
3972                                    );
3973                                } else {
3974                                    let builder = graph_builders.get_dfir_mut(&out_location);
3975                                    builder.add_dfir(
3976                                        parse_quote! {
3977                                            #reduce_ident = #input_ident -> #operator::<#lifetime>(#f);
3978                                        },
3979                                        None,
3980                                        Some(&next_stmt_id.to_string()),
3981                                    );
3982                                }
3983                            }
3984                            BuildersOrCallback::Callback(_, node_callback) => {
3985                                node_callback(node, next_stmt_id);
3986                            }
3987                        }
3988
3989                        *next_stmt_id += 1;
3990
3991                        ident_stack.push(reduce_ident);
3992                    }
3993
3994                    HydroNode::ReduceKeyedWatermark {
3995                        f,
3996                        input,
3997                        metadata,
3998                        ..
3999                    } => {
4000                        let lifetime = if input.metadata().location_id.is_top_level() {
4001                            quote!('static)
4002                        } else {
4003                            quote!('tick)
4004                        };
4005
4006                        // watermark is processed second, so it's on top
4007                        let watermark_ident = ident_stack.pop().unwrap();
4008                        let input_ident = ident_stack.pop().unwrap();
4009
4010                        let chain_ident = syn::Ident::new(
4011                            &format!("reduce_keyed_watermark_chain_{}", *next_stmt_id),
4012                            Span::call_site(),
4013                        );
4014
4015                        let fold_ident =
4016                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4017
4018                        let agg_operator: syn::Ident = if input.metadata().location_id.is_top_level()
4019                            && input.metadata().collection_kind.is_bounded()
4020                        {
4021                            parse_quote!(fold_no_replay)
4022                        } else {
4023                            parse_quote!(fold)
4024                        };
4025
4026                        match builders_or_callback {
4027                            BuildersOrCallback::Builders(graph_builders) => {
4028                                if metadata.location_id.is_top_level()
4029                                    && !(matches!(metadata.location_id, LocationId::Atomic(_)))
4030                                    && graph_builders.singleton_intermediates()
4031                                    && !metadata.collection_kind.is_bounded()
4032                                {
4033                                    todo!(
4034                                        "Reduce keyed watermarked on a top-level bounded collection is not yet supported"
4035                                    )
4036                                } else {
4037                                    let builder = graph_builders.get_dfir_mut(&out_location);
4038                                    builder.add_dfir(
4039                                        parse_quote! {
4040                                            #chain_ident = chain();
4041                                            #input_ident
4042                                                -> map(|x| (Some(x), None))
4043                                                -> [0]#chain_ident;
4044                                            #watermark_ident
4045                                                -> map(|watermark| (None, Some(watermark)))
4046                                                -> [1]#chain_ident;
4047
4048                                            #fold_ident = #chain_ident
4049                                                -> #agg_operator::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
4050                                                    let __reduce_keyed_fn = #f;
4051                                                    move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
4052                                                        if let Some((k, v)) = opt_payload {
4053                                                            if let Some(curr_watermark) = *opt_curr_watermark {
4054                                                                if k < curr_watermark {
4055                                                                    return;
4056                                                                }
4057                                                            }
4058                                                            match map.entry(k) {
4059                                                                ::std::collections::hash_map::Entry::Vacant(e) => {
4060                                                                    e.insert(v);
4061                                                                }
4062                                                                ::std::collections::hash_map::Entry::Occupied(mut e) => {
4063                                                                    __reduce_keyed_fn(e.get_mut(), v);
4064                                                                }
4065                                                            }
4066                                                        } else {
4067                                                            let watermark = opt_watermark.unwrap();
4068                                                            if let Some(curr_watermark) = *opt_curr_watermark {
4069                                                                if watermark <= curr_watermark {
4070                                                                    return;
4071                                                                }
4072                                                            }
4073                                                            *opt_curr_watermark = opt_watermark;
4074                                                            map.retain(|k, _| *k >= watermark);
4075                                                        }
4076                                                    }
4077                                                })
4078                                                -> flat_map(|(map, _curr_watermark)| map);
4079                                        },
4080                                        None,
4081                                        Some(&next_stmt_id.to_string()),
4082                                    );
4083                                }
4084                            }
4085                            BuildersOrCallback::Callback(_, node_callback) => {
4086                                node_callback(node, next_stmt_id);
4087                            }
4088                        }
4089
4090                        *next_stmt_id += 1;
4091
4092                        ident_stack.push(fold_ident);
4093                    }
4094
4095                    HydroNode::Network {
4096                        networking_info,
4097                        serialize_fn: serialize_pipeline,
4098                        instantiate_fn,
4099                        deserialize_fn: deserialize_pipeline,
4100                        input,
4101                        ..
4102                    } => {
4103                        let input_ident = ident_stack.pop().unwrap();
4104
4105                        let receiver_stream_ident =
4106                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4107
4108                        match builders_or_callback {
4109                            BuildersOrCallback::Builders(graph_builders) => {
4110                                let (sink_expr, source_expr) = match instantiate_fn {
4111                                    DebugInstantiate::Building => (
4112                                        syn::parse_quote!(DUMMY_SINK),
4113                                        syn::parse_quote!(DUMMY_SOURCE),
4114                                    ),
4115
4116                                    DebugInstantiate::Finalized(finalized) => {
4117                                        (finalized.sink.clone(), finalized.source.clone())
4118                                    }
4119                                };
4120
4121                                graph_builders.create_network(
4122                                    &input.metadata().location_id,
4123                                    &out_location,
4124                                    input_ident,
4125                                    &receiver_stream_ident,
4126                                    serialize_pipeline.as_ref(),
4127                                    sink_expr,
4128                                    source_expr,
4129                                    deserialize_pipeline.as_ref(),
4130                                    *next_stmt_id,
4131                                    networking_info,
4132                                );
4133                            }
4134                            BuildersOrCallback::Callback(_, node_callback) => {
4135                                node_callback(node, next_stmt_id);
4136                            }
4137                        }
4138
4139                        *next_stmt_id += 1;
4140
4141                        ident_stack.push(receiver_stream_ident);
4142                    }
4143
4144                    HydroNode::ExternalInput {
4145                        instantiate_fn,
4146                        deserialize_fn: deserialize_pipeline,
4147                        ..
4148                    } => {
4149                        let receiver_stream_ident =
4150                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4151
4152                        match builders_or_callback {
4153                            BuildersOrCallback::Builders(graph_builders) => {
4154                                let (_, source_expr) = match instantiate_fn {
4155                                    DebugInstantiate::Building => (
4156                                        syn::parse_quote!(DUMMY_SINK),
4157                                        syn::parse_quote!(DUMMY_SOURCE),
4158                                    ),
4159
4160                                    DebugInstantiate::Finalized(finalized) => {
4161                                        (finalized.sink.clone(), finalized.source.clone())
4162                                    }
4163                                };
4164
4165                                graph_builders.create_external_source(
4166                                    &out_location,
4167                                    source_expr,
4168                                    &receiver_stream_ident,
4169                                    deserialize_pipeline.as_ref(),
4170                                    *next_stmt_id,
4171                                );
4172                            }
4173                            BuildersOrCallback::Callback(_, node_callback) => {
4174                                node_callback(node, next_stmt_id);
4175                            }
4176                        }
4177
4178                        *next_stmt_id += 1;
4179
4180                        ident_stack.push(receiver_stream_ident);
4181                    }
4182
4183                    HydroNode::Counter {
4184                        tag,
4185                        duration,
4186                        prefix,
4187                        ..
4188                    } => {
4189                        let input_ident = ident_stack.pop().unwrap();
4190
4191                        let counter_ident =
4192                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4193
4194                        match builders_or_callback {
4195                            BuildersOrCallback::Builders(graph_builders) => {
4196                                let arg = format!("{}({})", prefix, tag);
4197                                let builder = graph_builders.get_dfir_mut(&out_location);
4198                                builder.add_dfir(
4199                                    parse_quote! {
4200                                        #counter_ident = #input_ident -> _counter(#arg, #duration);
4201                                    },
4202                                    None,
4203                                    Some(&next_stmt_id.to_string()),
4204                                );
4205                            }
4206                            BuildersOrCallback::Callback(_, node_callback) => {
4207                                node_callback(node, next_stmt_id);
4208                            }
4209                        }
4210
4211                        *next_stmt_id += 1;
4212
4213                        ident_stack.push(counter_ident);
4214                    }
4215                }
4216            },
4217            seen_tees,
4218            false,
4219        );
4220
4221        ident_stack
4222            .pop()
4223            .expect("ident_stack should have exactly one element after traversal")
4224    }
4225
4226    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
4227        match self {
4228            HydroNode::Placeholder => {
4229                panic!()
4230            }
4231            HydroNode::Cast { .. } | HydroNode::ObserveNonDet { .. } => {}
4232            HydroNode::Source { source, .. } => match source {
4233                HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
4234                HydroSource::ExternalNetwork()
4235                | HydroSource::Spin()
4236                | HydroSource::ClusterMembers(_, _)
4237                | HydroSource::Embedded(_)
4238                | HydroSource::EmbeddedSingleton(_) => {} // TODO: what goes here?
4239            },
4240            HydroNode::SingletonSource { value, .. } => {
4241                transform(value);
4242            }
4243            HydroNode::CycleSource { .. }
4244            | HydroNode::Tee { .. }
4245            | HydroNode::YieldConcat { .. }
4246            | HydroNode::BeginAtomic { .. }
4247            | HydroNode::EndAtomic { .. }
4248            | HydroNode::Batch { .. }
4249            | HydroNode::Chain { .. }
4250            | HydroNode::ChainFirst { .. }
4251            | HydroNode::CrossProduct { .. }
4252            | HydroNode::CrossSingleton { .. }
4253            | HydroNode::ResolveFutures { .. }
4254            | HydroNode::ResolveFuturesBlocking { .. }
4255            | HydroNode::ResolveFuturesOrdered { .. }
4256            | HydroNode::Join { .. }
4257            | HydroNode::JoinHalf { .. }
4258            | HydroNode::Difference { .. }
4259            | HydroNode::AntiJoin { .. }
4260            | HydroNode::DeferTick { .. }
4261            | HydroNode::Enumerate { .. }
4262            | HydroNode::Unique { .. }
4263            | HydroNode::Sort { .. } => {}
4264            HydroNode::Map { f, .. }
4265            | HydroNode::FlatMap { f, .. }
4266            | HydroNode::FlatMapStreamBlocking { f, .. }
4267            | HydroNode::Filter { f, .. }
4268            | HydroNode::FilterMap { f, .. }
4269            | HydroNode::Inspect { f, .. }
4270            | HydroNode::Partition { f, .. }
4271            | HydroNode::Reduce { f, .. }
4272            | HydroNode::ReduceKeyed { f, .. }
4273            | HydroNode::ReduceKeyedWatermark { f, .. } => {
4274                transform(f);
4275            }
4276            HydroNode::Fold { init, acc, .. }
4277            | HydroNode::Scan { init, acc, .. }
4278            | HydroNode::ScanAsyncBlocking { init, acc, .. }
4279            | HydroNode::FoldKeyed { init, acc, .. } => {
4280                transform(init);
4281                transform(acc);
4282            }
4283            HydroNode::Network {
4284                serialize_fn,
4285                deserialize_fn,
4286                ..
4287            } => {
4288                if let Some(serialize_fn) = serialize_fn {
4289                    transform(serialize_fn);
4290                }
4291                if let Some(deserialize_fn) = deserialize_fn {
4292                    transform(deserialize_fn);
4293                }
4294            }
4295            HydroNode::ExternalInput { deserialize_fn, .. } => {
4296                if let Some(deserialize_fn) = deserialize_fn {
4297                    transform(deserialize_fn);
4298                }
4299            }
4300            HydroNode::Counter { duration, .. } => {
4301                transform(duration);
4302            }
4303        }
4304    }
4305
4306    pub fn op_metadata(&self) -> &HydroIrOpMetadata {
4307        &self.metadata().op
4308    }
4309
4310    pub fn metadata(&self) -> &HydroIrMetadata {
4311        match self {
4312            HydroNode::Placeholder => {
4313                panic!()
4314            }
4315            HydroNode::Cast { metadata, .. } => metadata,
4316            HydroNode::ObserveNonDet { metadata, .. } => metadata,
4317            HydroNode::Source { metadata, .. } => metadata,
4318            HydroNode::SingletonSource { metadata, .. } => metadata,
4319            HydroNode::CycleSource { metadata, .. } => metadata,
4320            HydroNode::Tee { metadata, .. } => metadata,
4321            HydroNode::Partition { metadata, .. } => metadata,
4322            HydroNode::YieldConcat { metadata, .. } => metadata,
4323            HydroNode::BeginAtomic { metadata, .. } => metadata,
4324            HydroNode::EndAtomic { metadata, .. } => metadata,
4325            HydroNode::Batch { metadata, .. } => metadata,
4326            HydroNode::Chain { metadata, .. } => metadata,
4327            HydroNode::ChainFirst { metadata, .. } => metadata,
4328            HydroNode::CrossProduct { metadata, .. } => metadata,
4329            HydroNode::CrossSingleton { metadata, .. } => metadata,
4330            HydroNode::Join { metadata, .. } => metadata,
4331            HydroNode::JoinHalf { metadata, .. } => metadata,
4332            HydroNode::Difference { metadata, .. } => metadata,
4333            HydroNode::AntiJoin { metadata, .. } => metadata,
4334            HydroNode::ResolveFutures { metadata, .. } => metadata,
4335            HydroNode::ResolveFuturesBlocking { metadata, .. } => metadata,
4336            HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
4337            HydroNode::Map { metadata, .. } => metadata,
4338            HydroNode::FlatMap { metadata, .. } => metadata,
4339            HydroNode::FlatMapStreamBlocking { metadata, .. } => metadata,
4340            HydroNode::Filter { metadata, .. } => metadata,
4341            HydroNode::FilterMap { metadata, .. } => metadata,
4342            HydroNode::DeferTick { metadata, .. } => metadata,
4343            HydroNode::Enumerate { metadata, .. } => metadata,
4344            HydroNode::Inspect { metadata, .. } => metadata,
4345            HydroNode::Unique { metadata, .. } => metadata,
4346            HydroNode::Sort { metadata, .. } => metadata,
4347            HydroNode::Scan { metadata, .. } => metadata,
4348            HydroNode::ScanAsyncBlocking { metadata, .. } => metadata,
4349            HydroNode::Fold { metadata, .. } => metadata,
4350            HydroNode::FoldKeyed { metadata, .. } => metadata,
4351            HydroNode::Reduce { metadata, .. } => metadata,
4352            HydroNode::ReduceKeyed { metadata, .. } => metadata,
4353            HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
4354            HydroNode::ExternalInput { metadata, .. } => metadata,
4355            HydroNode::Network { metadata, .. } => metadata,
4356            HydroNode::Counter { metadata, .. } => metadata,
4357        }
4358    }
4359
4360    pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
4361        &mut self.metadata_mut().op
4362    }
4363
4364    pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
4365        match self {
4366            HydroNode::Placeholder => {
4367                panic!()
4368            }
4369            HydroNode::Cast { metadata, .. } => metadata,
4370            HydroNode::ObserveNonDet { metadata, .. } => metadata,
4371            HydroNode::Source { metadata, .. } => metadata,
4372            HydroNode::SingletonSource { metadata, .. } => metadata,
4373            HydroNode::CycleSource { metadata, .. } => metadata,
4374            HydroNode::Tee { metadata, .. } => metadata,
4375            HydroNode::Partition { metadata, .. } => metadata,
4376            HydroNode::YieldConcat { metadata, .. } => metadata,
4377            HydroNode::BeginAtomic { metadata, .. } => metadata,
4378            HydroNode::EndAtomic { metadata, .. } => metadata,
4379            HydroNode::Batch { metadata, .. } => metadata,
4380            HydroNode::Chain { metadata, .. } => metadata,
4381            HydroNode::ChainFirst { metadata, .. } => metadata,
4382            HydroNode::CrossProduct { metadata, .. } => metadata,
4383            HydroNode::CrossSingleton { metadata, .. } => metadata,
4384            HydroNode::Join { metadata, .. } => metadata,
4385            HydroNode::JoinHalf { metadata, .. } => metadata,
4386            HydroNode::Difference { metadata, .. } => metadata,
4387            HydroNode::AntiJoin { metadata, .. } => metadata,
4388            HydroNode::ResolveFutures { metadata, .. } => metadata,
4389            HydroNode::ResolveFuturesBlocking { metadata, .. } => metadata,
4390            HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
4391            HydroNode::Map { metadata, .. } => metadata,
4392            HydroNode::FlatMap { metadata, .. } => metadata,
4393            HydroNode::FlatMapStreamBlocking { metadata, .. } => metadata,
4394            HydroNode::Filter { metadata, .. } => metadata,
4395            HydroNode::FilterMap { metadata, .. } => metadata,
4396            HydroNode::DeferTick { metadata, .. } => metadata,
4397            HydroNode::Enumerate { metadata, .. } => metadata,
4398            HydroNode::Inspect { metadata, .. } => metadata,
4399            HydroNode::Unique { metadata, .. } => metadata,
4400            HydroNode::Sort { metadata, .. } => metadata,
4401            HydroNode::Scan { metadata, .. } => metadata,
4402            HydroNode::ScanAsyncBlocking { metadata, .. } => metadata,
4403            HydroNode::Fold { metadata, .. } => metadata,
4404            HydroNode::FoldKeyed { metadata, .. } => metadata,
4405            HydroNode::Reduce { metadata, .. } => metadata,
4406            HydroNode::ReduceKeyed { metadata, .. } => metadata,
4407            HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
4408            HydroNode::ExternalInput { metadata, .. } => metadata,
4409            HydroNode::Network { metadata, .. } => metadata,
4410            HydroNode::Counter { metadata, .. } => metadata,
4411        }
4412    }
4413
4414    pub fn input(&self) -> Vec<&HydroNode> {
4415        match self {
4416            HydroNode::Placeholder => {
4417                panic!()
4418            }
4419            HydroNode::Source { .. }
4420            | HydroNode::SingletonSource { .. }
4421            | HydroNode::ExternalInput { .. }
4422            | HydroNode::CycleSource { .. }
4423            | HydroNode::Tee { .. }
4424            | HydroNode::Partition { .. } => {
4425                // Tee/Partition should find their input in separate special ways
4426                vec![]
4427            }
4428            HydroNode::Cast { inner, .. }
4429            | HydroNode::ObserveNonDet { inner, .. }
4430            | HydroNode::YieldConcat { inner, .. }
4431            | HydroNode::BeginAtomic { inner, .. }
4432            | HydroNode::EndAtomic { inner, .. }
4433            | HydroNode::Batch { inner, .. } => {
4434                vec![inner]
4435            }
4436            HydroNode::Chain { first, second, .. } => {
4437                vec![first, second]
4438            }
4439            HydroNode::ChainFirst { first, second, .. } => {
4440                vec![first, second]
4441            }
4442            HydroNode::CrossProduct { left, right, .. }
4443            | HydroNode::CrossSingleton { left, right, .. }
4444            | HydroNode::Join { left, right, .. }
4445            | HydroNode::JoinHalf { left, right, .. } => {
4446                vec![left, right]
4447            }
4448            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
4449                vec![pos, neg]
4450            }
4451            HydroNode::Map { input, .. }
4452            | HydroNode::FlatMap { input, .. }
4453            | HydroNode::FlatMapStreamBlocking { input, .. }
4454            | HydroNode::Filter { input, .. }
4455            | HydroNode::FilterMap { input, .. }
4456            | HydroNode::Sort { input, .. }
4457            | HydroNode::DeferTick { input, .. }
4458            | HydroNode::Enumerate { input, .. }
4459            | HydroNode::Inspect { input, .. }
4460            | HydroNode::Unique { input, .. }
4461            | HydroNode::Network { input, .. }
4462            | HydroNode::Counter { input, .. }
4463            | HydroNode::ResolveFutures { input, .. }
4464            | HydroNode::ResolveFuturesBlocking { input, .. }
4465            | HydroNode::ResolveFuturesOrdered { input, .. }
4466            | HydroNode::Fold { input, .. }
4467            | HydroNode::FoldKeyed { input, .. }
4468            | HydroNode::Reduce { input, .. }
4469            | HydroNode::ReduceKeyed { input, .. }
4470            | HydroNode::Scan { input, .. }
4471            | HydroNode::ScanAsyncBlocking { input, .. } => {
4472                vec![input]
4473            }
4474            HydroNode::ReduceKeyedWatermark {
4475                input, watermark, ..
4476            } => {
4477                vec![input, watermark]
4478            }
4479        }
4480    }
4481
4482    pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
4483        self.input()
4484            .iter()
4485            .map(|input_node| input_node.metadata())
4486            .collect()
4487    }
4488
4489    /// Returns `true` if this node is a Tee or Partition whose inner Rc
4490    /// has other live references, meaning the upstream is already driven
4491    /// by another consumer and does not need a Null sink.
4492    pub fn is_shared_with_others(&self) -> bool {
4493        match self {
4494            HydroNode::Tee { inner, .. } | HydroNode::Partition { inner, .. } => {
4495                Rc::strong_count(&inner.0) > 1
4496            }
4497            _ => false,
4498        }
4499    }
4500
4501    pub fn print_root(&self) -> String {
4502        match self {
4503            HydroNode::Placeholder => {
4504                panic!()
4505            }
4506            HydroNode::Cast { .. } => "Cast()".to_owned(),
4507            HydroNode::ObserveNonDet { .. } => "ObserveNonDet()".to_owned(),
4508            HydroNode::Source { source, .. } => format!("Source({:?})", source),
4509            HydroNode::SingletonSource {
4510                value,
4511                first_tick_only,
4512                ..
4513            } => format!(
4514                "SingletonSource({:?}, first_tick_only={})",
4515                value, first_tick_only
4516            ),
4517            HydroNode::CycleSource { cycle_id, .. } => format!("CycleSource({})", cycle_id),
4518            HydroNode::Tee { inner, .. } => format!("Tee({})", inner.0.borrow().print_root()),
4519            HydroNode::Partition { f, is_true, .. } => {
4520                format!("Partition({:?}, is_true={})", f, is_true)
4521            }
4522            HydroNode::YieldConcat { .. } => "YieldConcat()".to_owned(),
4523            HydroNode::BeginAtomic { .. } => "BeginAtomic()".to_owned(),
4524            HydroNode::EndAtomic { .. } => "EndAtomic()".to_owned(),
4525            HydroNode::Batch { .. } => "Batch()".to_owned(),
4526            HydroNode::Chain { first, second, .. } => {
4527                format!("Chain({}, {})", first.print_root(), second.print_root())
4528            }
4529            HydroNode::ChainFirst { first, second, .. } => {
4530                format!(
4531                    "ChainFirst({}, {})",
4532                    first.print_root(),
4533                    second.print_root()
4534                )
4535            }
4536            HydroNode::CrossProduct { left, right, .. } => {
4537                format!(
4538                    "CrossProduct({}, {})",
4539                    left.print_root(),
4540                    right.print_root()
4541                )
4542            }
4543            HydroNode::CrossSingleton { left, right, .. } => {
4544                format!(
4545                    "CrossSingleton({}, {})",
4546                    left.print_root(),
4547                    right.print_root()
4548                )
4549            }
4550            HydroNode::Join { left, right, .. } => {
4551                format!("Join({}, {})", left.print_root(), right.print_root())
4552            }
4553            HydroNode::JoinHalf { left, right, .. } => {
4554                format!("JoinHalf({}, {})", left.print_root(), right.print_root())
4555            }
4556            HydroNode::Difference { pos, neg, .. } => {
4557                format!("Difference({}, {})", pos.print_root(), neg.print_root())
4558            }
4559            HydroNode::AntiJoin { pos, neg, .. } => {
4560                format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
4561            }
4562            HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_owned(),
4563            HydroNode::ResolveFuturesBlocking { .. } => "ResolveFuturesBlocking()".to_owned(),
4564            HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_owned(),
4565            HydroNode::Map { f, .. } => format!("Map({:?})", f),
4566            HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
4567            HydroNode::FlatMapStreamBlocking { f, .. } => format!("FlatMapStreamBlocking({:?})", f),
4568            HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
4569            HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
4570            HydroNode::DeferTick { .. } => "DeferTick()".to_owned(),
4571            HydroNode::Enumerate { .. } => "Enumerate()".to_owned(),
4572            HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
4573            HydroNode::Unique { .. } => "Unique()".to_owned(),
4574            HydroNode::Sort { .. } => "Sort()".to_owned(),
4575            HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
4576            HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
4577            HydroNode::ScanAsyncBlocking { init, acc, .. } => {
4578                format!("ScanAsyncBlocking({:?}, {:?})", init, acc)
4579            }
4580            HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
4581            HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
4582            HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
4583            HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
4584            HydroNode::Network { .. } => "Network()".to_owned(),
4585            HydroNode::ExternalInput { .. } => "ExternalInput()".to_owned(),
4586            HydroNode::Counter { tag, duration, .. } => {
4587                format!("Counter({:?}, {:?})", tag, duration)
4588            }
4589        }
4590    }
4591}
4592
4593#[cfg(feature = "build")]
4594fn instantiate_network<'a, D>(
4595    env: &mut D::InstantiateEnv,
4596    from_location: &LocationId,
4597    to_location: &LocationId,
4598    processes: &SparseSecondaryMap<LocationKey, D::Process>,
4599    clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
4600    name: Option<&str>,
4601    networking_info: &crate::networking::NetworkingInfo,
4602) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
4603where
4604    D: Deploy<'a>,
4605{
4606    let ((sink, source), connect_fn) = match (from_location, to_location) {
4607        (&LocationId::Process(from), &LocationId::Process(to)) => {
4608            let from_node = processes
4609                .get(from)
4610                .unwrap_or_else(|| {
4611                    panic!("A process used in the graph was not instantiated: {}", from)
4612                })
4613                .clone();
4614            let to_node = processes
4615                .get(to)
4616                .unwrap_or_else(|| {
4617                    panic!("A process used in the graph was not instantiated: {}", to)
4618                })
4619                .clone();
4620
4621            let sink_port = from_node.next_port();
4622            let source_port = to_node.next_port();
4623
4624            (
4625                D::o2o_sink_source(
4626                    env,
4627                    &from_node,
4628                    &sink_port,
4629                    &to_node,
4630                    &source_port,
4631                    name,
4632                    networking_info,
4633                ),
4634                D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
4635            )
4636        }
4637        (&LocationId::Process(from), &LocationId::Cluster(to)) => {
4638            let from_node = processes
4639                .get(from)
4640                .unwrap_or_else(|| {
4641                    panic!("A process used in the graph was not instantiated: {}", from)
4642                })
4643                .clone();
4644            let to_node = clusters
4645                .get(to)
4646                .unwrap_or_else(|| {
4647                    panic!("A cluster used in the graph was not instantiated: {}", to)
4648                })
4649                .clone();
4650
4651            let sink_port = from_node.next_port();
4652            let source_port = to_node.next_port();
4653
4654            (
4655                D::o2m_sink_source(
4656                    env,
4657                    &from_node,
4658                    &sink_port,
4659                    &to_node,
4660                    &source_port,
4661                    name,
4662                    networking_info,
4663                ),
4664                D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
4665            )
4666        }
4667        (&LocationId::Cluster(from), &LocationId::Process(to)) => {
4668            let from_node = clusters
4669                .get(from)
4670                .unwrap_or_else(|| {
4671                    panic!("A cluster used in the graph was not instantiated: {}", from)
4672                })
4673                .clone();
4674            let to_node = processes
4675                .get(to)
4676                .unwrap_or_else(|| {
4677                    panic!("A process used in the graph was not instantiated: {}", to)
4678                })
4679                .clone();
4680
4681            let sink_port = from_node.next_port();
4682            let source_port = to_node.next_port();
4683
4684            (
4685                D::m2o_sink_source(
4686                    env,
4687                    &from_node,
4688                    &sink_port,
4689                    &to_node,
4690                    &source_port,
4691                    name,
4692                    networking_info,
4693                ),
4694                D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
4695            )
4696        }
4697        (&LocationId::Cluster(from), &LocationId::Cluster(to)) => {
4698            let from_node = clusters
4699                .get(from)
4700                .unwrap_or_else(|| {
4701                    panic!("A cluster used in the graph was not instantiated: {}", from)
4702                })
4703                .clone();
4704            let to_node = clusters
4705                .get(to)
4706                .unwrap_or_else(|| {
4707                    panic!("A cluster used in the graph was not instantiated: {}", to)
4708                })
4709                .clone();
4710
4711            let sink_port = from_node.next_port();
4712            let source_port = to_node.next_port();
4713
4714            (
4715                D::m2m_sink_source(
4716                    env,
4717                    &from_node,
4718                    &sink_port,
4719                    &to_node,
4720                    &source_port,
4721                    name,
4722                    networking_info,
4723                ),
4724                D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
4725            )
4726        }
4727        (LocationId::Tick(_, _), _) => panic!(),
4728        (_, LocationId::Tick(_, _)) => panic!(),
4729        (LocationId::Atomic(_), _) => panic!(),
4730        (_, LocationId::Atomic(_)) => panic!(),
4731    };
4732    (sink, source, connect_fn)
4733}
4734
4735#[cfg(test)]
4736mod serde_test;
4737
4738#[cfg(test)]
4739mod test {
4740    use std::mem::size_of;
4741
4742    use stageleft::{QuotedWithContext, q};
4743
4744    use super::*;
4745
4746    #[test]
4747    #[cfg_attr(
4748        not(feature = "build"),
4749        ignore = "expects inclusion of feature-gated fields"
4750    )]
4751    fn hydro_node_size() {
4752        assert_eq!(size_of::<HydroNode>(), 248);
4753    }
4754
4755    #[test]
4756    #[cfg_attr(
4757        not(feature = "build"),
4758        ignore = "expects inclusion of feature-gated fields"
4759    )]
4760    fn hydro_root_size() {
4761        assert_eq!(size_of::<HydroRoot>(), 136);
4762    }
4763
4764    #[test]
4765    fn test_simplify_q_macro_basic() {
4766        // Test basic non-q! expression
4767        let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
4768        let result = simplify_q_macro(simple_expr.clone());
4769        assert_eq!(result, simple_expr);
4770    }
4771
4772    #[test]
4773    fn test_simplify_q_macro_actual_stageleft_call() {
4774        // Test a simplified version of what a real stageleft call might look like
4775        let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
4776        let result = simplify_q_macro(stageleft_call);
4777        // This should be processed by our visitor and simplified to q!(...)
4778        // since we detect the stageleft::runtime_support::fn_* pattern
4779        hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
4780    }
4781
4782    #[test]
4783    fn test_closure_no_pipe_at_start() {
4784        // Test a closure that does not start with a pipe
4785        let stageleft_call = q!({
4786            let foo = 123;
4787            move |b: usize| b + foo
4788        })
4789        .splice_fn1_ctx(&());
4790        let result = simplify_q_macro(stageleft_call);
4791        hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
4792    }
4793}