CIRCT 23.0.0git
Loading...
Searching...
No Matches
simulator.py
Go to the documentation of this file.
1# Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
2# See https://llvm.org/LICENSE.txt for license information.
3# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
4
5import os
6import re
7import signal
8import socket
9import subprocess
10import time
11from pathlib import Path
12from typing import Dict, List, Optional, Callable, IO, Union
13import threading
14
15_thisdir = Path(__file__).parent
16CosimCollateralDir = _thisdir
17
18
19def is_port_open(port) -> bool:
20 """Check if a TCP port is open locally."""
21 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
22 result = sock.connect_ex(('127.0.0.1', port))
23 sock.close()
24 return True if result == 0 else False
25
26
28
29 def __init__(self, top: str) -> None:
30 # User source files.
31 self.user: List[Path] = []
32 # DPI shared objects.
33 self.dpi_so: List[str] = ["EsiCosimDpiServer"]
34 # DPI SV files.
35 self.dpi_sv: List[Path] = [
36 CosimCollateralDir / "Cosim_DpiPkg.sv",
37 CosimCollateralDir / "Cosim_Endpoint.sv",
38 CosimCollateralDir / "Cosim_CycleCount.sv",
39 CosimCollateralDir / "Cosim_Manifest.sv",
40 ]
41 # Name of the top module.
42 self.top = top
43
44 def add_file(self, file: Path):
45 """Add a single RTL file to the source list."""
46 if file.is_file():
47 self.user.append(file)
48 else:
49 raise FileNotFoundError(f"File {file} does not exist")
50
51 def add_dir(self, dir: Path):
52 """Add all the RTL files in a directory to the source list."""
53 for file in sorted(dir.iterdir()):
54 if file.is_file() and (file.suffix == ".sv" or file.suffix == ".v"):
55 self.user.append(file)
56 elif file.is_dir():
57 self.add_dir(file)
58
59 def dpi_so_paths(self) -> List[Path]:
60 """Return a list of all the DPI shared object files."""
61
62 def find_so(name: str) -> Path:
63
64 def check_path(p: Path) -> Optional[Path]:
65 if os.name == "nt":
66 so = p / f"{name}.dll"
67 else:
68 so = p / f"lib{name}.so"
69 return so if so.exists() else None
70
71 for path in Simulator.get_env().get("LD_LIBRARY_PATH", "").split(":"):
72 p = check_path(Path(path))
73 if p is not None:
74 return p
75
76 # If it's not in LD_LIBRARY_PATH, check a couple of parent directories
77 # relative to this file.
78 search_parent = _thisdir.parent
79 p = check_path(search_parent / "lib")
80 if p is not None:
81 return p
82 search_parent = search_parent.parent
83 p = check_path(search_parent / "lib")
84 if p is not None:
85 return p
86
87 raise FileNotFoundError(f"Could not find {name}.so")
88
89 return [find_so(name) for name in self.dpi_so]
90
91 @property
92 def rtl_sources(self) -> List[Path]:
93 """Return a list of all the RTL source files."""
94 return self.dpi_sv + self.user
95
96
98
99 def __init__(self,
100 proc: subprocess.Popen,
101 port: int,
102 threads: Optional[List[threading.Thread]] = None,
103 gui: bool = False):
104 self.proc = proc
105 self.port = port
106 self.threads: List[threading.Thread] = threads or []
107 self.gui = gui
108
109 def force_stop(self):
110 """Make sure to stop the simulation no matter what."""
111 if self.proc:
112 os.killpg(os.getpgid(self.proc.pid), signal.SIGINT)
113 # Allow the simulation time to flush its outputs.
114 try:
115 self.proc.wait(timeout=1.0)
116 except subprocess.TimeoutExpired:
117 # If the simulation doesn't exit of its own free will, kill it.
118 self.proc.kill()
119
120 # Join reader threads (they should exit once pipes are closed).
121 for t in self.threads:
122 t.join()
123
124
126
127 CompileCommand = List[str]
128 CompileFunction = Callable[[], Optional[int]]
129 CompileStep = Union[CompileCommand, CompileFunction]
130
131 # Some RTL simulators don't use stderr for error messages. Everything goes to
132 # stdout. Boo! They should feel bad about this. Also, they can specify that
133 # broken behavior by overriding this.
134 UsesStderr = True
135
136 def __init__(self,
137 sources: SourceFiles,
138 run_dir: Path,
139 debug: bool,
140 save_waveform: bool = False,
141 run_stdout_callback: Optional[Callable[[str], None]] = None,
142 run_stderr_callback: Optional[Callable[[str], None]] = None,
143 compile_stdout_callback: Optional[Callable[[str], None]] = None,
144 compile_stderr_callback: Optional[Callable[[str], None]] = None,
145 make_default_logs: bool = True,
146 macro_definitions: Optional[Dict[str, str]] = None):
147 """Simulator base class.
148
149 Optional sinks can be provided for capturing output. If not provided,
150 the simulator will write to log files in `run_dir`.
151
152 Args:
153 sources: SourceFiles describing RTL/DPI inputs.
154 run_dir: Directory where build/run artifacts are placed.
155 debug: Enable cosim debug mode.
156 save_waveform: When True and debug=True, dump simulator waveforms to a
157 waveform file. The exact format depends on the backend (e.g. FST for
158 Verilator, VCD for Questa). Requires debug to be enabled.
159 run_stdout_callback: Line-based callback for runtime stdout.
160 run_stderr_callback: Line-based callback for runtime stderr.
161 compile_stdout_callback: Line-based callback for compile stdout.
162 compile_stderr_callback: Line-based callback for compile stderr.
163 make_default_logs: If True and corresponding callback is not supplied,
164 create log file and emit via internally-created callback.
165 macro_definitions: Optional dictionary of macro definitions to be defined
166 during compilation.
167 """
168 self.sources = sources
169 self.run_dir = run_dir
170 self.debug = debug
171 self.save_waveform = save_waveform
172 self.macro_definitions = macro_definitions
173
174 # Unified list of any log file handles we opened.
175 self._default_files: List[IO[str]] = []
176
177 def _ensure_default(cb: Optional[Callable[[str], None]], filename: str):
178 """Return (callback, file_handle_or_None) with optional file creation.
179
180 Behavior:
181 * If a callback is provided, return it unchanged with no file.
182 * If no callback and make_default_logs is False, return (None, None).
183 * If no callback and make_default_logs is True, create a log file and
184 return a writer callback plus the opened file handle.
185 """
186 if cb is not None:
187 return cb, None
188 if not make_default_logs:
189 return None, None
190 p = self.run_dir / filename
191 p.parent.mkdir(parents=True, exist_ok=True)
192 logf = p.open("w+")
193 self._default_files.append(logf)
194
195 def _writer(line: str, _lf=logf):
196 _lf.write(line + "\n")
197 _lf.flush()
198
199 return _writer, logf
200
201 # Initialize all four (compile/run stdout/stderr) uniformly.
202 self._compile_stdout_cb, self._compile_stdout_log = _ensure_default(
203 compile_stdout_callback, 'compile_stdout.log')
204 self._compile_stderr_cb, self._compile_stderr_log = _ensure_default(
205 compile_stderr_callback, 'compile_stderr.log')
206 self._run_stdout_cb, self._run_stdout_log = _ensure_default(
207 run_stdout_callback, 'sim_stdout.log')
208 self._run_stderr_cb, self._run_stderr_log = _ensure_default(
209 run_stderr_callback, 'sim_stderr.log')
210
211 @staticmethod
212 def get_env() -> Dict[str, str]:
213 """Get the environment variables to locate shared objects."""
214
215 env = os.environ.copy()
216 env["LIBRARY_PATH"] = env.get("LIBRARY_PATH", "") + ":" + str(
217 _thisdir.parent / "lib") + ":" + str(_thisdir.parent.parent / "lib")
218 env["LD_LIBRARY_PATH"] = env.get("LD_LIBRARY_PATH", "") + ":" + str(
219 _thisdir.parent / "lib") + ":" + str(_thisdir.parent.parent / "lib")
220 return env
221
222 def compile_commands(self) -> List[CompileStep]:
223 """Return the compile steps for the simulator.
224
225 Each step may be either a shell command (`List[str]`) or a Python callback
226 (`Callable[[], Optional[int]]`). Python callbacks should return `0` or
227 `None` on success and a non-zero integer on failure.
228 """
229 assert False, "Must be implemented by subclass"
230
231 def _run_compile_command(self, cmd: CompileCommand) -> int:
232 ret = self._start_process_with_callbacks(cmd,
233 env=Simulator.get_env(),
234 cwd=None,
235 stdout_cb=self._compile_stdout_cb,
236 stderr_cb=self._compile_stderr_cb,
237 wait=True)
238 if isinstance(ret, int) and ret != 0:
239 print("====== Compilation failure")
240
241 # Always print both stdout and stderr so that linker errors (which go
242 # to stdout for cmake/ninja) are not silently hidden.
243 if self._compile_stdout_log is not None:
244 self._compile_stdout_log.seek(0)
245 stdout_content = self._compile_stdout_log.read()
246 if stdout_content:
247 print(stdout_content)
248 if self._compile_stderr_log is not None:
249 self._compile_stderr_log.seek(0)
250 stderr_content = self._compile_stderr_log.read()
251 if stderr_content:
252 print(stderr_content)
253
254 return ret if isinstance(ret, int) else 1
255
256 def _run_compile_step(self, step: CompileStep) -> int:
257 if callable(step):
258 ret = step()
259 if ret is None:
260 return 0
261 if not isinstance(ret, int):
262 raise TypeError("compile step callback must return int or None")
263 return ret
264 return self._run_compile_command(step)
265
266 def compile(self) -> int:
267 cmds = self.compile_commands()
268 self.run_dir.mkdir(parents=True, exist_ok=True)
269 for step in cmds:
270 ret = self._run_compile_step(step)
271 if ret != 0:
272 return ret
273 return 0
274
275 def run_command(self, gui: bool) -> List[str]:
276 """Return the command to run the simulation."""
277 assert False, "Must be implemented by subclass"
278
279 @property
280 def waveform_extension(self) -> str:
281 """File extension for waveform dumps.
282
283 Subclasses should override if their format differs. The Verilator C++
284 driver writes FST (via ``VerilatedFstC``); the generic SV driver uses
285 ``$dumpfile/$dumpvars`` which produces VCD.
286 """
287 return ".vcd"
288
289 def run_proc(self, gui: bool = False) -> SimProcess:
290 """Run the simulation process. Returns the Popen object and the port which
291 the simulation is listening on.
292
293 If user-provided stdout/stderr sinks were supplied in the constructor,
294 they are used. Otherwise, log files are created in `run_dir`.
295 """
296 self.run_dir.mkdir(parents=True, exist_ok=True)
297
298 env_gui = os.environ.get("COSIM_GUI", "0")
299 if env_gui != "0":
300 gui = True
301
302 # Erase the config file if it exists. We don't want to read
303 # an old config.
304 portFileName = self.run_dir / "cosim.cfg"
305 if os.path.exists(portFileName):
306 os.remove(portFileName)
307
308 # Run the simulation.
309 simEnv = Simulator.get_env()
310 if self.debug:
311 debug_file = (self.run_dir / "cosim_debug.log").resolve()
312 simEnv["COSIM_DEBUG_FILE"] = str(debug_file)
313 if "DEBUG_PERIOD" not in simEnv:
314 # Slow the simulation down to one tick per millisecond.
315 simEnv["DEBUG_PERIOD"] = "1"
316 if self.save_waveform:
317 waveform_file = (self.run_dir /
318 f"cosim_waveform{self.waveform_extension}").resolve()
319 simEnv["SAVE_WAVE"] = str(waveform_file)
320 rcmd = self.run_command(gui)
321 # Start process with asynchronous output capture.
322 proc, threads = self._start_process_with_callbacks(
323 rcmd,
324 env=simEnv,
325 cwd=self.run_dir,
326 stdout_cb=self._run_stdout_cb,
327 stderr_cb=self._run_stderr_cb,
328 wait=False)
329
330 # Get the port which the simulation RPC selected.
331 checkCount = 0
332 while (not os.path.exists(portFileName)) and \
333 proc.poll() is None:
334 time.sleep(0.1)
335 checkCount += 1
336 if checkCount > 500 and not gui:
337 raise Exception(f"Cosim never wrote cfg file: {portFileName}")
338 port = -1
339 while port < 0:
340 portFile = open(portFileName, "r")
341 for line in portFile.readlines():
342 m = re.match("port: (\\d+)", line)
343 if m is not None:
344 port = int(m.group(1))
345 portFile.close()
346
347 # The cosim server writes ``cosim.cfg`` after its TCP listen socket is
348 # bound and the accept thread has been spawned. So we don't need to wait for
349 # the port to be opened.
350 return SimProcess(proc=proc, port=port, threads=threads, gui=gui)
351
353 self, cmd: List[str], env: Optional[Dict[str, str]], cwd: Optional[Path],
354 stdout_cb: Optional[Callable[[str],
355 None]], stderr_cb: Optional[Callable[[str],
356 None]],
357 wait: bool) -> int | tuple[subprocess.Popen, List[threading.Thread]]:
358 """Start a subprocess and stream its stdout/stderr to callbacks.
359
360 If wait is True, blocks until process completes and returns its exit code.
361 If wait is False, returns the Popen object (threads keep streaming).
362 """
363 if os.name == "posix":
364 proc = subprocess.Popen(cmd,
365 stdout=subprocess.PIPE,
366 stderr=subprocess.PIPE,
367 env=env,
368 cwd=cwd,
369 text=True,
370 preexec_fn=os.setsid)
371 else: # windows
372 proc = subprocess.Popen(cmd,
373 stdout=subprocess.PIPE,
374 stderr=subprocess.PIPE,
375 env=env,
376 cwd=cwd,
377 text=True,
378 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
379
380 def _reader(pipe, cb):
381 if pipe is None:
382 return
383 for raw in pipe:
384 if raw.endswith('\n'):
385 raw = raw[:-1]
386 if cb:
387 try:
388 cb(raw)
389 except Exception as e:
390 print(f"Exception in simulator output callback: {e}")
391
392 threads: List[threading.Thread] = [
393 threading.Thread(target=_reader,
394 args=(proc.stdout, stdout_cb),
395 daemon=True),
396 threading.Thread(target=_reader,
397 args=(proc.stderr, stderr_cb),
398 daemon=True),
399 ]
400 for t in threads:
401 t.start()
402 if wait:
403 for t in threads:
404 t.join()
405 return proc.wait()
406 return proc, threads
407
408 def run(self,
409 inner_command: str,
410 gui: bool = False,
411 server_only: bool = False) -> int:
412 """Start the simulation then run the command specified. Kill the simulation
413 when the command exits."""
414
415 # 'simProc' is accessed in the finally block. Declare it here to avoid
416 # syntax errors in that block.
417 simProc = None
418 try:
419 simProc = self.run_proc(gui=gui)
420 if server_only:
421 # wait for user input to kill the server
422 input(
423 f"Running in server-only mode on port {simProc.port} - Press anything to kill the server..."
424 )
425 return 0
426 else:
427 # Run the inner command, passing the connection info via environment vars.
428 testEnv = os.environ.copy()
429 testEnv["ESI_COSIM_PORT"] = str(simProc.port)
430 testEnv["ESI_COSIM_HOST"] = "localhost"
431 ret = subprocess.run(inner_command, cwd=os.getcwd(),
432 env=testEnv).returncode
433 if simProc.gui:
434 print("GUI mode - waiting for simulator to exit...")
435 simProc.proc.wait()
436 return ret
437 finally:
438 if simProc and simProc.proc.poll() is None:
439 simProc.force_stop()
440
441
442def get_simulator(name: str,
443 sources: SourceFiles,
444 rundir: Path,
445 debug: bool,
446 save_waveform: bool = False) -> Simulator:
447 name = name.lower()
448 if name == "verilator":
449 from .verilator import Verilator
450 return Verilator(sources, rundir, debug, save_waveform=save_waveform)
451 elif name == "questa":
452 from .questa import Questa
453 return Questa(sources, rundir, debug, save_waveform=save_waveform)
454 else:
455 raise ValueError(f"Unknown simulator: {name}")
static void print(TypedAttr val, llvm::raw_ostream &os)
static mlir::Operation * resolve(Context &context, mlir::SymbolRefAttr sym)
static StringAttr append(StringAttr base, const Twine &suffix)
Return a attribute with the specified suffix appended.
__init__(self, subprocess.Popen proc, int port, Optional[List[threading.Thread]] threads=None, bool gui=False)
Definition simulator.py:103
int|tuple[subprocess.Popen, List[threading.Thread]] _start_process_with_callbacks(self, List[str] cmd, Optional[Dict[str, str]] env, Optional[Path] cwd, Optional[Callable[[str], None]] stdout_cb, Optional[Callable[[str], None]] stderr_cb, bool wait)
Definition simulator.py:357
__init__(self, SourceFiles sources, Path run_dir, bool debug, bool save_waveform=False, Optional[Callable[[str], None]] run_stdout_callback=None, Optional[Callable[[str], None]] run_stderr_callback=None, Optional[Callable[[str], None]] compile_stdout_callback=None, Optional[Callable[[str], None]] compile_stderr_callback=None, bool make_default_logs=True, Optional[Dict[str, str]] macro_definitions=None)
Definition simulator.py:146
int run(self, str inner_command, bool gui=False, bool server_only=False)
Definition simulator.py:411
List[CompileStep] compile_commands(self)
Definition simulator.py:222
int _run_compile_command(self, CompileCommand cmd)
Definition simulator.py:231
str waveform_extension(self)
Definition simulator.py:280
int _run_compile_step(self, CompileStep step)
Definition simulator.py:256
Dict[str, str] get_env()
Definition simulator.py:212
List[str] run_command(self, bool gui)
Definition simulator.py:275
SimProcess run_proc(self, bool gui=False)
Definition simulator.py:289
List[Path] rtl_sources(self)
Definition simulator.py:92
add_file(self, Path file)
Definition simulator.py:44
None __init__(self, str top)
Definition simulator.py:29
List[Path] dpi_so_paths(self)
Definition simulator.py:59
add_dir(self, Path dir)
Definition simulator.py:51
bool is_port_open(port)
Definition simulator.py:19