CIRCT 23.0.0git
Loading...
Searching...
No Matches
simulator.py
Go to the documentation of this file.
1# Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
2# See https://llvm.org/LICENSE.txt for license information.
3# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
4
5import os
6import re
7import signal
8import socket
9import subprocess
10import time
11from pathlib import Path
12from typing import Dict, List, Optional, Callable, IO, Union
13import threading
14
15_thisdir = Path(__file__).parent
16CosimCollateralDir = _thisdir
17
18
19def is_port_open(port) -> bool:
20 """Check if a TCP port is open locally."""
21 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
22 result = sock.connect_ex(('127.0.0.1', port))
23 sock.close()
24 return True if result == 0 else False
25
26
28
29 def __init__(self, top: str) -> None:
30 # User source files.
31 self.user: List[Path] = []
32 # DPI shared objects.
33 self.dpi_so: List[str] = ["EsiCosimDpiServer"]
34 # DPI SV files.
35 self.dpi_sv: List[Path] = [
36 CosimCollateralDir / "Cosim_DpiPkg.sv",
37 CosimCollateralDir / "Cosim_Endpoint.sv",
38 CosimCollateralDir / "Cosim_CycleCount.sv",
39 CosimCollateralDir / "Cosim_Manifest.sv",
40 ]
41 # Name of the top module.
42 self.top = top
43
44 def add_file(self, file: Path):
45 """Add a single RTL file to the source list."""
46 if file.is_file():
47 self.user.append(file)
48 else:
49 raise FileNotFoundError(f"File {file} does not exist")
50
51 def add_dir(self, dir: Path):
52 """Add all the RTL files in a directory to the source list."""
53 for file in sorted(dir.iterdir()):
54 if file.is_file() and (file.suffix == ".sv" or file.suffix == ".v"):
55 self.user.append(file)
56 elif file.is_dir():
57 self.add_dir(file)
58
59 def dpi_so_paths(self) -> List[Path]:
60 """Return a list of all the DPI shared object files."""
61
62 def find_so(name: str) -> Path:
63
64 def check_path(p: Path) -> Optional[Path]:
65 if os.name == "nt":
66 so = p / f"{name}.dll"
67 else:
68 so = p / f"lib{name}.so"
69 return so if so.exists() else None
70
71 for path in Simulator.get_env().get("LD_LIBRARY_PATH", "").split(":"):
72 p = check_path(Path(path))
73 if p is not None:
74 return p
75
76 # If it's not in LD_LIBRARY_PATH, check a couple of parent directories
77 # relative to this file.
78 search_parent = _thisdir.parent
79 p = check_path(search_parent / "lib")
80 if p is not None:
81 return p
82 search_parent = search_parent.parent
83 p = check_path(search_parent / "lib")
84 if p is not None:
85 return p
86
87 raise FileNotFoundError(f"Could not find {name}.so")
88
89 return [find_so(name) for name in self.dpi_so]
90
91 @property
92 def rtl_sources(self) -> List[Path]:
93 """Return a list of all the RTL source files."""
94 return self.dpi_sv + self.user
95
96
98
99 def __init__(self,
100 proc: subprocess.Popen,
101 port: int,
102 threads: Optional[List[threading.Thread]] = None,
103 gui: bool = False):
104 self.proc = proc
105 self.port = port
106 self.threads: List[threading.Thread] = threads or []
107 self.gui = gui
108
109 def force_stop(self):
110 """Make sure to stop the simulation no matter what."""
111 if self.proc:
112 os.killpg(os.getpgid(self.proc.pid), signal.SIGINT)
113 # Allow the simulation time to flush its outputs.
114 try:
115 self.proc.wait(timeout=1.0)
116 except subprocess.TimeoutExpired:
117 # If the simulation doesn't exit of its own free will, kill it.
118 self.proc.kill()
119
120 # Join reader threads (they should exit once pipes are closed).
121 for t in self.threads:
122 t.join()
123
124
126
127 CompileCommand = List[str]
128 CompileFunction = Callable[[], Optional[int]]
129 CompileStep = Union[CompileCommand, CompileFunction]
130
131 # Some RTL simulators don't use stderr for error messages. Everything goes to
132 # stdout. Boo! They should feel bad about this. Also, they can specify that
133 # broken behavior by overriding this.
134 UsesStderr = True
135
136 def __init__(self,
137 sources: SourceFiles,
138 run_dir: Path,
139 debug: bool,
140 save_waveform: bool = False,
141 run_stdout_callback: Optional[Callable[[str], None]] = None,
142 run_stderr_callback: Optional[Callable[[str], None]] = None,
143 compile_stdout_callback: Optional[Callable[[str], None]] = None,
144 compile_stderr_callback: Optional[Callable[[str], None]] = None,
145 make_default_logs: bool = True,
146 macro_definitions: Optional[Dict[str, str]] = None):
147 """Simulator base class.
148
149 Optional sinks can be provided for capturing output. If not provided,
150 the simulator will write to log files in `run_dir`.
151
152 Args:
153 sources: SourceFiles describing RTL/DPI inputs.
154 run_dir: Directory where build/run artifacts are placed.
155 debug: Enable cosim debug mode.
156 save_waveform: When True and debug=True, dump simulator waveforms to a
157 waveform file. The exact format depends on the backend (e.g. FST for
158 Verilator, VCD for Questa). Requires debug to be enabled.
159 run_stdout_callback: Line-based callback for runtime stdout.
160 run_stderr_callback: Line-based callback for runtime stderr.
161 compile_stdout_callback: Line-based callback for compile stdout.
162 compile_stderr_callback: Line-based callback for compile stderr.
163 make_default_logs: If True and corresponding callback is not supplied,
164 create log file and emit via internally-created callback.
165 macro_definitions: Optional dictionary of macro definitions to be defined
166 during compilation.
167 """
168 self.sources = sources
169 self.run_dir = run_dir
170 self.debug = debug
171 self.save_waveform = save_waveform
172 self.macro_definitions = macro_definitions
173
174 # Unified list of any log file handles we opened.
175 self._default_files: List[IO[str]] = []
176
177 def _ensure_default(cb: Optional[Callable[[str], None]], filename: str):
178 """Return (callback, file_handle_or_None) with optional file creation.
179
180 Behavior:
181 * If a callback is provided, return it unchanged with no file.
182 * If no callback and make_default_logs is False, return (None, None).
183 * If no callback and make_default_logs is True, create a log file and
184 return a writer callback plus the opened file handle.
185 """
186 if cb is not None:
187 return cb, None
188 if not make_default_logs:
189 return None, None
190 p = self.run_dir / filename
191 p.parent.mkdir(parents=True, exist_ok=True)
192 logf = p.open("w+")
193 self._default_files.append(logf)
194
195 def _writer(line: str, _lf=logf):
196 _lf.write(line + "\n")
197 _lf.flush()
198
199 return _writer, logf
200
201 # Initialize all four (compile/run stdout/stderr) uniformly.
202 self._compile_stdout_cb, self._compile_stdout_log = _ensure_default(
203 compile_stdout_callback, 'compile_stdout.log')
204 self._compile_stderr_cb, self._compile_stderr_log = _ensure_default(
205 compile_stderr_callback, 'compile_stderr.log')
206 self._run_stdout_cb, self._run_stdout_log = _ensure_default(
207 run_stdout_callback, 'sim_stdout.log')
208 self._run_stderr_cb, self._run_stderr_log = _ensure_default(
209 run_stderr_callback, 'sim_stderr.log')
210
211 @staticmethod
212 def get_env() -> Dict[str, str]:
213 """Get the environment variables to locate shared objects."""
214
215 env = os.environ.copy()
216 env["LIBRARY_PATH"] = env.get("LIBRARY_PATH", "") + ":" + str(
217 _thisdir.parent / "lib") + ":" + str(_thisdir.parent.parent / "lib")
218 env["LD_LIBRARY_PATH"] = env.get("LD_LIBRARY_PATH", "") + ":" + str(
219 _thisdir.parent / "lib") + ":" + str(_thisdir.parent.parent / "lib")
220 return env
221
222 def compile_commands(self) -> List[CompileStep]:
223 """Return the compile steps for the simulator.
224
225 Each step may be either a shell command (`List[str]`) or a Python callback
226 (`Callable[[], Optional[int]]`). Python callbacks should return `0` or
227 `None` on success and a non-zero integer on failure.
228 """
229 assert False, "Must be implemented by subclass"
230
231 def _run_compile_command(self, cmd: CompileCommand) -> int:
232 ret = self._start_process_with_callbacks(cmd,
233 env=Simulator.get_env(),
234 cwd=None,
235 stdout_cb=self._compile_stdout_cb,
236 stderr_cb=self._compile_stderr_cb,
237 wait=True)
238 if isinstance(ret, int) and ret != 0:
239 print("====== Compilation failure")
240
241 # Always print both stdout and stderr so that linker errors (which go
242 # to stdout for cmake/ninja) are not silently hidden.
243 if self._compile_stdout_log is not None:
244 self._compile_stdout_log.seek(0)
245 stdout_content = self._compile_stdout_log.read()
246 if stdout_content:
247 print(stdout_content)
248 if self._compile_stderr_log is not None:
249 self._compile_stderr_log.seek(0)
250 stderr_content = self._compile_stderr_log.read()
251 if stderr_content:
252 print(stderr_content)
253
254 return ret if isinstance(ret, int) else 1
255
256 def _run_compile_step(self, step: CompileStep) -> int:
257 if callable(step):
258 ret = step()
259 if ret is None:
260 return 0
261 if not isinstance(ret, int):
262 raise TypeError("compile step callback must return int or None")
263 return ret
264 return self._run_compile_command(step)
265
266 def compile(self) -> int:
267 cmds = self.compile_commands()
268 self.run_dir.mkdir(parents=True, exist_ok=True)
269 for step in cmds:
270 ret = self._run_compile_step(step)
271 if ret != 0:
272 return ret
273 return 0
274
275 def run_command(self, gui: bool) -> List[str]:
276 """Return the command to run the simulation."""
277 assert False, "Must be implemented by subclass"
278
279 @property
280 def waveform_extension(self) -> str:
281 """File extension for waveform dumps.
282
283 Subclasses should override if their format differs. The Verilator C++
284 driver writes FST (via ``VerilatedFstC``); the generic SV driver uses
285 ``$dumpfile/$dumpvars`` which produces VCD.
286 """
287 return ".vcd"
288
289 def run_proc(self, gui: bool = False) -> SimProcess:
290 """Run the simulation process. Returns the Popen object and the port which
291 the simulation is listening on.
292
293 If user-provided stdout/stderr sinks were supplied in the constructor,
294 they are used. Otherwise, log files are created in `run_dir`.
295 """
296 self.run_dir.mkdir(parents=True, exist_ok=True)
297
298 env_gui = os.environ.get("COSIM_GUI", "0")
299 if env_gui != "0":
300 gui = True
301
302 # Erase the config file if it exists. We don't want to read
303 # an old config.
304 portFileName = self.run_dir / "cosim.cfg"
305 if os.path.exists(portFileName):
306 os.remove(portFileName)
307
308 # Run the simulation.
309 simEnv = Simulator.get_env()
310 if self.debug:
311 debug_file = (self.run_dir / "cosim_debug.log").resolve()
312 simEnv["COSIM_DEBUG_FILE"] = str(debug_file)
313 if "DEBUG_PERIOD" not in simEnv:
314 # Slow the simulation down to one tick per millisecond.
315 simEnv["DEBUG_PERIOD"] = "1"
316 if self.save_waveform:
317 waveform_file = (self.run_dir /
318 f"cosim_waveform{self.waveform_extension}").resolve()
319 simEnv["SAVE_WAVE"] = str(waveform_file)
320 rcmd = self.run_command(gui)
321 # Start process with asynchronous output capture.
322 proc, threads = self._start_process_with_callbacks(
323 rcmd,
324 env=simEnv,
325 cwd=self.run_dir,
326 stdout_cb=self._run_stdout_cb,
327 stderr_cb=self._run_stderr_cb,
328 wait=False)
329
330 # Get the port which the simulation RPC selected.
331 checkCount = 0
332 while (not os.path.exists(portFileName)) and \
333 proc.poll() is None:
334 time.sleep(0.1)
335 checkCount += 1
336 if checkCount > 500 and not gui:
337 raise Exception(f"Cosim never wrote cfg file: {portFileName}")
338 port = -1
339 while port < 0:
340 portFile = open(portFileName, "r")
341 for line in portFile.readlines():
342 m = re.match("port: (\\d+)", line)
343 if m is not None:
344 port = int(m.group(1))
345 portFile.close()
346
347 # Wait for the simulation to start accepting RPC connections.
348 checkCount = 0
349 while not is_port_open(port):
350 checkCount += 1
351 if checkCount > 200:
352 raise Exception(f"Cosim RPC port ({port}) never opened")
353 if proc.poll() is not None:
354 raise Exception("Simulation exited early")
355 time.sleep(0.05)
356 return SimProcess(proc=proc, port=port, threads=threads, gui=gui)
357
359 self, cmd: List[str], env: Optional[Dict[str, str]], cwd: Optional[Path],
360 stdout_cb: Optional[Callable[[str],
361 None]], stderr_cb: Optional[Callable[[str],
362 None]],
363 wait: bool) -> int | tuple[subprocess.Popen, List[threading.Thread]]:
364 """Start a subprocess and stream its stdout/stderr to callbacks.
365
366 If wait is True, blocks until process completes and returns its exit code.
367 If wait is False, returns the Popen object (threads keep streaming).
368 """
369 if os.name == "posix":
370 proc = subprocess.Popen(cmd,
371 stdout=subprocess.PIPE,
372 stderr=subprocess.PIPE,
373 env=env,
374 cwd=cwd,
375 text=True,
376 preexec_fn=os.setsid)
377 else: # windows
378 proc = subprocess.Popen(cmd,
379 stdout=subprocess.PIPE,
380 stderr=subprocess.PIPE,
381 env=env,
382 cwd=cwd,
383 text=True,
384 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
385
386 def _reader(pipe, cb):
387 if pipe is None:
388 return
389 for raw in pipe:
390 if raw.endswith('\n'):
391 raw = raw[:-1]
392 if cb:
393 try:
394 cb(raw)
395 except Exception as e:
396 print(f"Exception in simulator output callback: {e}")
397
398 threads: List[threading.Thread] = [
399 threading.Thread(target=_reader,
400 args=(proc.stdout, stdout_cb),
401 daemon=True),
402 threading.Thread(target=_reader,
403 args=(proc.stderr, stderr_cb),
404 daemon=True),
405 ]
406 for t in threads:
407 t.start()
408 if wait:
409 for t in threads:
410 t.join()
411 return proc.wait()
412 return proc, threads
413
414 def run(self,
415 inner_command: str,
416 gui: bool = False,
417 server_only: bool = False) -> int:
418 """Start the simulation then run the command specified. Kill the simulation
419 when the command exits."""
420
421 # 'simProc' is accessed in the finally block. Declare it here to avoid
422 # syntax errors in that block.
423 simProc = None
424 try:
425 simProc = self.run_proc(gui=gui)
426 if server_only:
427 # wait for user input to kill the server
428 input(
429 f"Running in server-only mode on port {simProc.port} - Press anything to kill the server..."
430 )
431 return 0
432 else:
433 # Run the inner command, passing the connection info via environment vars.
434 testEnv = os.environ.copy()
435 testEnv["ESI_COSIM_PORT"] = str(simProc.port)
436 testEnv["ESI_COSIM_HOST"] = "localhost"
437 ret = subprocess.run(inner_command, cwd=os.getcwd(),
438 env=testEnv).returncode
439 if simProc.gui:
440 print("GUI mode - waiting for simulator to exit...")
441 simProc.proc.wait()
442 return ret
443 finally:
444 if simProc and simProc.proc.poll() is None:
445 simProc.force_stop()
446
447
448def get_simulator(name: str,
449 sources: SourceFiles,
450 rundir: Path,
451 debug: bool,
452 save_waveform: bool = False) -> Simulator:
453 name = name.lower()
454 if name == "verilator":
455 from .verilator import Verilator
456 return Verilator(sources, rundir, debug, save_waveform=save_waveform)
457 elif name == "questa":
458 from .questa import Questa
459 return Questa(sources, rundir, debug, save_waveform=save_waveform)
460 else:
461 raise ValueError(f"Unknown simulator: {name}")
static void print(TypedAttr val, llvm::raw_ostream &os)
static mlir::Operation * resolve(Context &context, mlir::SymbolRefAttr sym)
static StringAttr append(StringAttr base, const Twine &suffix)
Return a attribute with the specified suffix appended.
__init__(self, subprocess.Popen proc, int port, Optional[List[threading.Thread]] threads=None, bool gui=False)
Definition simulator.py:103
int|tuple[subprocess.Popen, List[threading.Thread]] _start_process_with_callbacks(self, List[str] cmd, Optional[Dict[str, str]] env, Optional[Path] cwd, Optional[Callable[[str], None]] stdout_cb, Optional[Callable[[str], None]] stderr_cb, bool wait)
Definition simulator.py:363
__init__(self, SourceFiles sources, Path run_dir, bool debug, bool save_waveform=False, Optional[Callable[[str], None]] run_stdout_callback=None, Optional[Callable[[str], None]] run_stderr_callback=None, Optional[Callable[[str], None]] compile_stdout_callback=None, Optional[Callable[[str], None]] compile_stderr_callback=None, bool make_default_logs=True, Optional[Dict[str, str]] macro_definitions=None)
Definition simulator.py:146
int run(self, str inner_command, bool gui=False, bool server_only=False)
Definition simulator.py:417
List[CompileStep] compile_commands(self)
Definition simulator.py:222
int _run_compile_command(self, CompileCommand cmd)
Definition simulator.py:231
str waveform_extension(self)
Definition simulator.py:280
int _run_compile_step(self, CompileStep step)
Definition simulator.py:256
Dict[str, str] get_env()
Definition simulator.py:212
List[str] run_command(self, bool gui)
Definition simulator.py:275
SimProcess run_proc(self, bool gui=False)
Definition simulator.py:289
List[Path] rtl_sources(self)
Definition simulator.py:92
add_file(self, Path file)
Definition simulator.py:44
None __init__(self, str top)
Definition simulator.py:29
List[Path] dpi_so_paths(self)
Definition simulator.py:59
add_dir(self, Path dir)
Definition simulator.py:51
bool is_port_open(port)
Definition simulator.py:19