Skip to main content

hydro_lang/compile/
built.rs

1use std::marker::PhantomData;
2
3use dfir_lang::graph::{
4    DfirGraph, FlatGraphBuilderOutput, eliminate_extra_unions_tees, partition_graph,
5};
6use slotmap::{SecondaryMap, SlotMap, SparseSecondaryMap};
7
8use super::compiled::CompiledFlow;
9use super::deploy::{DeployFlow, DeployResult};
10use super::deploy_provider::{ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec};
11use super::ir::{HydroRoot, emit};
12use crate::location::{Cluster, External, LocationKey, LocationType, Process};
13#[cfg(stageleft_runtime)]
14#[cfg(feature = "sim")]
15use crate::sim::{flow::SimFlow, graph::SimNode};
16use crate::staging_util::Invariant;
17#[cfg(stageleft_runtime)]
18#[cfg(feature = "viz")]
19use crate::viz::api::GraphApi;
20
21pub struct BuiltFlow<'a> {
22    pub(super) ir: Vec<HydroRoot>,
23    pub(super) locations: SlotMap<LocationKey, LocationType>,
24    pub(super) location_names: SecondaryMap<LocationKey, String>,
25
26    /// Application name used in telemetry.
27    pub(super) flow_name: String,
28
29    pub(super) _phantom: Invariant<'a>,
30}
31
32pub(crate) fn build_inner(ir: &mut Vec<HydroRoot>) -> SecondaryMap<LocationKey, DfirGraph> {
33    emit(ir)
34        .into_iter()
35        .map(|(k, v)| {
36            let FlatGraphBuilderOutput { mut flat_graph, .. } =
37                v.build().expect("Failed to build DFIR flat graph.");
38            eliminate_extra_unions_tees(&mut flat_graph);
39            let partitioned_graph =
40                partition_graph(flat_graph).expect("Failed to partition (cycle detected).");
41            (k, partitioned_graph)
42        })
43        .collect()
44}
45
46impl<'a> BuiltFlow<'a> {
47    /// Returns all [`HydroRoot`]s in the IR.
48    pub fn ir(&self) -> &[HydroRoot] {
49        &self.ir
50    }
51
52    /// Serialize the IR as JSON.
53    #[cfg(feature = "viz")]
54    pub fn ir_json(&self) -> Result<String, serde_json::Error> {
55        super::ir::serialize_dedup_shared(|| serde_json::to_string_pretty(&self.ir))
56    }
57
58    /// Returns all raw location ID -> location name mappings.
59    pub fn location_names(&self) -> &SecondaryMap<LocationKey, String> {
60        &self.location_names
61    }
62
63    /// Get a GraphApi instance for this built flow
64    #[cfg(stageleft_runtime)]
65    #[cfg(feature = "viz")]
66    pub fn graph_api(&self) -> GraphApi<'_> {
67        GraphApi::new(&self.ir, self.location_names())
68    }
69
70    /// Render graph to string in the given format.
71    #[cfg(feature = "viz")]
72    pub fn render_graph(
73        &self,
74        format: crate::viz::config::GraphType,
75        use_short_labels: bool,
76        show_metadata: bool,
77    ) -> String {
78        self.graph_api()
79            .render(format, use_short_labels, show_metadata)
80    }
81
82    /// Write graph to file.
83    #[cfg(feature = "viz")]
84    pub fn write_graph_to_file(
85        &self,
86        format: crate::viz::config::GraphType,
87        filename: &str,
88        use_short_labels: bool,
89        show_metadata: bool,
90    ) -> Result<(), Box<dyn std::error::Error>> {
91        self.graph_api()
92            .write_to_file(format, filename, use_short_labels, show_metadata)
93    }
94
95    /// Generate graph based on CLI config. Returns Some(path) if written.
96    #[cfg(feature = "viz")]
97    pub fn generate_graph(
98        &self,
99        config: &crate::viz::config::GraphConfig,
100    ) -> Result<Option<String>, Box<dyn std::error::Error>> {
101        self.graph_api().generate_graph(config)
102    }
103
104    pub fn optimize_with(mut self, f: impl FnOnce(&mut [HydroRoot])) -> Self {
105        f(&mut self.ir);
106        self
107    }
108
109    pub fn with_default_optimize<D: Deploy<'a>>(self) -> DeployFlow<'a, D> {
110        self.into_deploy()
111    }
112
113    #[cfg(feature = "sim")]
114    /// Creates a simulation for this builder, which can be used to run deterministic simulations
115    /// of the Hydro program.
116    pub fn sim(self) -> SimFlow<'a> {
117        use std::cell::RefCell;
118        use std::rc::Rc;
119
120        use slotmap::SparseSecondaryMap;
121
122        use crate::sim::graph::SimNodePort;
123
124        let shared_port_counter = Rc::new(RefCell::new(SimNodePort::default()));
125
126        let mut processes = SparseSecondaryMap::new();
127        let mut clusters = SparseSecondaryMap::new();
128        let externals = SparseSecondaryMap::new();
129
130        for (key, loc) in self.locations.iter() {
131            match loc {
132                LocationType::Process => {
133                    processes.insert(
134                        key,
135                        SimNode {
136                            shared_port_counter: shared_port_counter.clone(),
137                        },
138                    );
139                }
140                LocationType::Cluster => {
141                    clusters.insert(
142                        key,
143                        SimNode {
144                            shared_port_counter: shared_port_counter.clone(),
145                        },
146                    );
147                }
148                LocationType::External => {
149                    panic!("Sim cannot have externals");
150                }
151            }
152        }
153
154        SimFlow {
155            ir: self.ir,
156            processes,
157            clusters,
158            externals,
159            cluster_max_sizes: SparseSecondaryMap::new(),
160            externals_port_registry: Default::default(),
161            test_safety_only: false,
162            unit_test_fuzz_iterations: 8192,
163            _phantom: PhantomData,
164        }
165    }
166
167    pub fn into_deploy<D: Deploy<'a>>(self) -> DeployFlow<'a, D> {
168        let (processes, clusters, externals) = Default::default();
169        DeployFlow {
170            ir: self.ir,
171            locations: self.locations,
172            location_names: self.location_names,
173            processes,
174            clusters,
175            externals,
176            sidecars: SparseSecondaryMap::new(),
177            flow_name: self.flow_name,
178            _phantom: PhantomData,
179        }
180    }
181
182    pub fn with_process<P, D: Deploy<'a>>(
183        self,
184        process: &Process<P>,
185        spec: impl IntoProcessSpec<'a, D>,
186    ) -> DeployFlow<'a, D> {
187        self.into_deploy().with_process(process, spec)
188    }
189
190    pub fn with_remaining_processes<D: Deploy<'a>, S: IntoProcessSpec<'a, D> + 'a>(
191        self,
192        spec: impl Fn() -> S,
193    ) -> DeployFlow<'a, D> {
194        self.into_deploy().with_remaining_processes(spec)
195    }
196
197    pub fn with_external<P, D: Deploy<'a>>(
198        self,
199        process: &External<P>,
200        spec: impl ExternalSpec<'a, D>,
201    ) -> DeployFlow<'a, D> {
202        self.into_deploy().with_external(process, spec)
203    }
204
205    pub fn with_remaining_externals<D: Deploy<'a>, S: ExternalSpec<'a, D> + 'a>(
206        self,
207        spec: impl Fn() -> S,
208    ) -> DeployFlow<'a, D> {
209        self.into_deploy().with_remaining_externals(spec)
210    }
211
212    pub fn with_cluster<C, D: Deploy<'a>>(
213        self,
214        cluster: &Cluster<C>,
215        spec: impl ClusterSpec<'a, D>,
216    ) -> DeployFlow<'a, D> {
217        self.into_deploy().with_cluster(cluster, spec)
218    }
219
220    pub fn with_remaining_clusters<D: Deploy<'a>, S: ClusterSpec<'a, D> + 'a>(
221        self,
222        spec: impl Fn() -> S,
223    ) -> DeployFlow<'a, D> {
224        self.into_deploy().with_remaining_clusters(spec)
225    }
226
227    pub fn compile<D: Deploy<'a, InstantiateEnv = ()>>(self) -> CompiledFlow<'a> {
228        self.into_deploy::<D>().compile()
229    }
230
231    pub fn deploy<D: Deploy<'a>>(self, env: &mut D::InstantiateEnv) -> DeployResult<'a, D> {
232        self.into_deploy::<D>().deploy(env)
233    }
234}