CIRCT 22.0.0git
Loading...
Searching...
No Matches
simulator.py
Go to the documentation of this file.
1# Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
2# See https://llvm.org/LICENSE.txt for license information.
3# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
4
5import os
6import re
7import signal
8import socket
9import subprocess
10import time
11from pathlib import Path
12from typing import Dict, List, Optional, Callable, IO
13import threading
14
15_thisdir = Path(__file__).parent
16CosimCollateralDir = _thisdir
17
18
19def is_port_open(port) -> bool:
20 """Check if a TCP port is open locally."""
21 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
22 result = sock.connect_ex(('127.0.0.1', port))
23 sock.close()
24 return True if result == 0 else False
25
26
28
29 def __init__(self, top: str) -> None:
30 # User source files.
31 self.user: List[Path] = []
32 # DPI shared objects.
33 self.dpi_so: List[str] = ["EsiCosimDpiServer"]
34 # DPI SV files.
35 self.dpi_sv: List[Path] = [
36 CosimCollateralDir / "Cosim_DpiPkg.sv",
37 CosimCollateralDir / "Cosim_Endpoint.sv",
38 CosimCollateralDir / "Cosim_CycleCount.sv",
39 CosimCollateralDir / "Cosim_Manifest.sv",
40 ]
41 # Name of the top module.
42 self.top = top
43
44 def add_file(self, file: Path):
45 """Add a single RTL file to the source list."""
46 if file.is_file():
47 self.user.append(file)
48 else:
49 raise FileNotFoundError(f"File {file} does not exist")
50
51 def add_dir(self, dir: Path):
52 """Add all the RTL files in a directory to the source list."""
53 for file in sorted(dir.iterdir()):
54 if file.is_file() and (file.suffix == ".sv" or file.suffix == ".v"):
55 self.user.append(file)
56 elif file.is_dir():
57 self.add_dir(file)
58
59 def dpi_so_paths(self) -> List[Path]:
60 """Return a list of all the DPI shared object files."""
61
62 def find_so(name: str) -> Path:
63 for path in Simulator.get_env().get("LD_LIBRARY_PATH", "").split(":"):
64 if os.name == "nt":
65 so = Path(path) / f"{name}.dll"
66 else:
67 so = Path(path) / f"lib{name}.so"
68 if so.exists():
69 return so
70 raise FileNotFoundError(f"Could not find {name}.so in LD_LIBRARY_PATH")
71
72 return [find_so(name) for name in self.dpi_so]
73
74 @property
75 def rtl_sources(self) -> List[Path]:
76 """Return a list of all the RTL source files."""
77 return self.dpi_sv + self.user
78
79
81
82 def __init__(self,
83 proc: subprocess.Popen,
84 port: int,
85 threads: Optional[List[threading.Thread]] = None,
86 gui: bool = False):
87 self.proc = proc
88 self.port = port
89 self.threads: List[threading.Thread] = threads or []
90 self.gui = gui
91
92 def force_stop(self):
93 """Make sure to stop the simulation no matter what."""
94 if self.proc:
95 os.killpg(os.getpgid(self.proc.pid), signal.SIGINT)
96 # Allow the simulation time to flush its outputs.
97 try:
98 self.proc.wait(timeout=1.0)
99 except subprocess.TimeoutExpired:
100 # If the simulation doesn't exit of its own free will, kill it.
101 self.proc.kill()
102
103 # Join reader threads (they should exit once pipes are closed).
104 for t in self.threads:
105 t.join()
106
107
109
110 # Some RTL simulators don't use stderr for error messages. Everything goes to
111 # stdout. Boo! They should feel bad about this. Also, they can specify that
112 # broken behavior by overriding this.
113 UsesStderr = True
114
115 def __init__(self,
116 sources: SourceFiles,
117 run_dir: Path,
118 debug: bool,
119 run_stdout_callback: Optional[Callable[[str], None]] = None,
120 run_stderr_callback: Optional[Callable[[str], None]] = None,
121 compile_stdout_callback: Optional[Callable[[str], None]] = None,
122 compile_stderr_callback: Optional[Callable[[str], None]] = None,
123 make_default_logs: bool = True,
124 macro_definitions: Optional[Dict[str, str]] = None):
125 """Simulator base class.
126
127 Optional sinks can be provided for capturing output. If not provided,
128 the simulator will write to log files in `run_dir`.
129
130 Args:
131 sources: SourceFiles describing RTL/DPI inputs.
132 run_dir: Directory where build/run artifacts are placed.
133 debug: Enable cosim debug mode.
134 run_stdout_callback: Line-based callback for runtime stdout.
135 run_stderr_callback: Line-based callback for runtime stderr.
136 compile_stdout_callback: Line-based callback for compile stdout.
137 compile_stderr_callback: Line-based callback for compile stderr.
138 make_default_logs: If True and corresponding callback is not supplied,
139 create log file and emit via internally-created callback.
140 macro_definitions: Optional dictionary of macro definitions to be defined
141 during compilation.
142 """
143 self.sources = sources
144 self.run_dir = run_dir
145 self.debug = debug
146 self.macro_definitions = macro_definitions
147
148 # Unified list of any log file handles we opened.
149 self._default_files: List[IO[str]] = []
150
151 def _ensure_default(cb: Optional[Callable[[str], None]], filename: str):
152 """Return (callback, file_handle_or_None) with optional file creation.
153
154 Behavior:
155 * If a callback is provided, return it unchanged with no file.
156 * If no callback and make_default_logs is False, return (None, None).
157 * If no callback and make_default_logs is True, create a log file and
158 return a writer callback plus the opened file handle.
159 """
160 if cb is not None:
161 return cb, None
162 if not make_default_logs:
163 return None, None
164 p = self.run_dir / filename
165 p.parent.mkdir(parents=True, exist_ok=True)
166 logf = p.open("w+")
167 self._default_files.append(logf)
168
169 def _writer(line: str, _lf=logf):
170 _lf.write(line + "\n")
171 _lf.flush()
172
173 return _writer, logf
174
175 # Initialize all four (compile/run stdout/stderr) uniformly.
176 self._compile_stdout_cb, self._compile_stdout_log = _ensure_default(
177 compile_stdout_callback, 'compile_stdout.log')
178 self._compile_stderr_cb, self._compile_stderr_log = _ensure_default(
179 compile_stderr_callback, 'compile_stderr.log')
180 self._run_stdout_cb, self._run_stdout_log = _ensure_default(
181 run_stdout_callback, 'sim_stdout.log')
182 self._run_stderr_cb, self._run_stderr_log = _ensure_default(
183 run_stderr_callback, 'sim_stderr.log')
184
185 @staticmethod
186 def get_env() -> Dict[str, str]:
187 """Get the environment variables to locate shared objects."""
188
189 env = os.environ.copy()
190 env["LIBRARY_PATH"] = env.get("LIBRARY_PATH", "") + ":" + str(
191 _thisdir.parent / "lib")
192 env["LD_LIBRARY_PATH"] = env.get("LD_LIBRARY_PATH", "") + ":" + str(
193 _thisdir.parent / "lib")
194 return env
195
196 def compile_commands(self) -> List[List[str]]:
197 """Compile the sources. Returns the exit code of the simulation compiler."""
198 assert False, "Must be implemented by subclass"
199
200 def compile(self) -> int:
201 cmds = self.compile_commands()
202 self.run_dir.mkdir(parents=True, exist_ok=True)
203 for cmd in cmds:
205 cmd,
206 env=Simulator.get_env(),
207 cwd=None,
208 stdout_cb=self._compile_stdout_cb,
209 stderr_cb=self._compile_stderr_cb,
210 wait=True)
211 if isinstance(ret, int) and ret != 0:
212 print("====== Compilation failure")
213
214 # If we have the default file loggers, print the compilation logs to
215 # console. Else, assume that the user has already captured them.
216 if self.UsesStderr:
217 if self._compile_stderr_log is not None:
218 self._compile_stderr_log.seek(0)
219 print(self._compile_stderr_log.read())
220 else:
221 if self._compile_stdout_log is not None:
222 self._compile_stdout_log.seek(0)
223 print(self._compile_stdout_log.read())
224
225 return ret
226 return 0
227
228 def run_command(self, gui: bool) -> List[str]:
229 """Return the command to run the simulation."""
230 assert False, "Must be implemented by subclass"
231
232 def run_proc(self, gui: bool = False) -> SimProcess:
233 """Run the simulation process. Returns the Popen object and the port which
234 the simulation is listening on.
235
236 If user-provided stdout/stderr sinks were supplied in the constructor,
237 they are used. Otherwise, log files are created in `run_dir`.
238 """
239 self.run_dir.mkdir(parents=True, exist_ok=True)
240
241 env_gui = os.environ.get("COSIM_GUI", "0")
242 if env_gui != "0":
243 gui = True
244
245 # Erase the config file if it exists. We don't want to read
246 # an old config.
247 portFileName = self.run_dir / "cosim.cfg"
248 if os.path.exists(portFileName):
249 os.remove(portFileName)
250
251 # Run the simulation.
252 simEnv = Simulator.get_env()
253 if self.debug:
254 simEnv["COSIM_DEBUG_FILE"] = "cosim_debug.log"
255 if "DEBUG_PERIOD" not in simEnv:
256 # Slow the simulation down to one tick per millisecond.
257 simEnv["DEBUG_PERIOD"] = "1"
258 rcmd = self.run_command(gui)
259 # Start process with asynchronous output capture.
260 proc, threads = self._start_process_with_callbacks(
261 rcmd,
262 env=simEnv,
263 cwd=self.run_dir,
264 stdout_cb=self._run_stdout_cb,
265 stderr_cb=self._run_stderr_cb,
266 wait=False)
267
268 # Get the port which the simulation RPC selected.
269 checkCount = 0
270 while (not os.path.exists(portFileName)) and \
271 proc.poll() is None:
272 time.sleep(0.1)
273 checkCount += 1
274 if checkCount > 500 and not gui:
275 raise Exception(f"Cosim never wrote cfg file: {portFileName}")
276 port = -1
277 while port < 0:
278 portFile = open(portFileName, "r")
279 for line in portFile.readlines():
280 m = re.match("port: (\\d+)", line)
281 if m is not None:
282 port = int(m.group(1))
283 portFile.close()
284
285 # Wait for the simulation to start accepting RPC connections.
286 checkCount = 0
287 while not is_port_open(port):
288 checkCount += 1
289 if checkCount > 200:
290 raise Exception(f"Cosim RPC port ({port}) never opened")
291 if proc.poll() is not None:
292 raise Exception("Simulation exited early")
293 time.sleep(0.05)
294 return SimProcess(proc=proc, port=port, threads=threads, gui=gui)
295
297 self, cmd: List[str], env: Optional[Dict[str, str]], cwd: Optional[Path],
298 stdout_cb: Optional[Callable[[str],
299 None]], stderr_cb: Optional[Callable[[str],
300 None]],
301 wait: bool) -> int | tuple[subprocess.Popen, List[threading.Thread]]:
302 """Start a subprocess and stream its stdout/stderr to callbacks.
303
304 If wait is True, blocks until process completes and returns its exit code.
305 If wait is False, returns the Popen object (threads keep streaming).
306 """
307 if os.name == "posix":
308 proc = subprocess.Popen(cmd,
309 stdout=subprocess.PIPE,
310 stderr=subprocess.PIPE,
311 env=env,
312 cwd=cwd,
313 text=True,
314 preexec_fn=os.setsid)
315 else: # windows
316 proc = subprocess.Popen(cmd,
317 stdout=subprocess.PIPE,
318 stderr=subprocess.PIPE,
319 env=env,
320 cwd=cwd,
321 text=True,
322 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
323
324 def _reader(pipe, cb):
325 if pipe is None:
326 return
327 for raw in pipe:
328 if raw.endswith('\n'):
329 raw = raw[:-1]
330 if cb:
331 try:
332 cb(raw)
333 except Exception as e:
334 print(f"Exception in simulator output callback: {e}")
335
336 threads: List[threading.Thread] = [
337 threading.Thread(target=_reader,
338 args=(proc.stdout, stdout_cb),
339 daemon=True),
340 threading.Thread(target=_reader,
341 args=(proc.stderr, stderr_cb),
342 daemon=True),
343 ]
344 for t in threads:
345 t.start()
346 if wait:
347 for t in threads:
348 t.join()
349 return proc.wait()
350 return proc, threads
351
352 def run(self,
353 inner_command: str,
354 gui: bool = False,
355 server_only: bool = False) -> int:
356 """Start the simulation then run the command specified. Kill the simulation
357 when the command exits."""
358
359 # 'simProc' is accessed in the finally block. Declare it here to avoid
360 # syntax errors in that block.
361 simProc = None
362 try:
363 simProc = self.run_proc(gui=gui)
364 if server_only:
365 # wait for user input to kill the server
366 input(
367 f"Running in server-only mode on port {simProc.port} - Press anything to kill the server..."
368 )
369 return 0
370 else:
371 # Run the inner command, passing the connection info via environment vars.
372 testEnv = os.environ.copy()
373 testEnv["ESI_COSIM_PORT"] = str(simProc.port)
374 testEnv["ESI_COSIM_HOST"] = "localhost"
375 ret = subprocess.run(inner_command, cwd=os.getcwd(),
376 env=testEnv).returncode
377 if simProc.gui:
378 print("GUI mode - waiting for simulator to exit...")
379 simProc.proc.wait()
380 return ret
381 finally:
382 if simProc and simProc.proc.poll() is None:
383 simProc.force_stop()
static void print(TypedAttr val, llvm::raw_ostream &os)
static StringAttr append(StringAttr base, const Twine &suffix)
Return a attribute with the specified suffix appended.
__init__(self, subprocess.Popen proc, int port, Optional[List[threading.Thread]] threads=None, bool gui=False)
Definition simulator.py:86
int|tuple[subprocess.Popen, List[threading.Thread]] _start_process_with_callbacks(self, List[str] cmd, Optional[Dict[str, str]] env, Optional[Path] cwd, Optional[Callable[[str], None]] stdout_cb, Optional[Callable[[str], None]] stderr_cb, bool wait)
Definition simulator.py:301
int run(self, str inner_command, bool gui=False, bool server_only=False)
Definition simulator.py:355
__init__(self, SourceFiles sources, Path run_dir, bool debug, Optional[Callable[[str], None]] run_stdout_callback=None, Optional[Callable[[str], None]] run_stderr_callback=None, Optional[Callable[[str], None]] compile_stdout_callback=None, Optional[Callable[[str], None]] compile_stderr_callback=None, bool make_default_logs=True, Optional[Dict[str, str]] macro_definitions=None)
Definition simulator.py:124
List[List[str]] compile_commands(self)
Definition simulator.py:196
Dict[str, str] get_env()
Definition simulator.py:186
List[str] run_command(self, bool gui)
Definition simulator.py:228
SimProcess run_proc(self, bool gui=False)
Definition simulator.py:232
List[Path] rtl_sources(self)
Definition simulator.py:75
add_file(self, Path file)
Definition simulator.py:44
None __init__(self, str top)
Definition simulator.py:29
List[Path] dpi_so_paths(self)
Definition simulator.py:59
add_dir(self, Path dir)
Definition simulator.py:51
bool is_port_open(port)
Definition simulator.py:19