CIRCT 22.0.0git
Loading...
Searching...
No Matches
simulator.py
Go to the documentation of this file.
1# Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
2# See https://llvm.org/LICENSE.txt for license information.
3# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
4
5import os
6import re
7import signal
8import socket
9import subprocess
10import time
11from pathlib import Path
12from typing import Dict, List, Optional, Callable, IO
13import threading
14
15_thisdir = Path(__file__).parent
16CosimCollateralDir = _thisdir
17
18
19def is_port_open(port) -> bool:
20 """Check if a TCP port is open locally."""
21 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
22 result = sock.connect_ex(('127.0.0.1', port))
23 sock.close()
24 return True if result == 0 else False
25
26
28
29 def __init__(self, top: str) -> None:
30 # User source files.
31 self.user: List[Path] = []
32 # DPI shared objects.
33 self.dpi_so: List[str] = ["EsiCosimDpiServer"]
34 # DPI SV files.
35 self.dpi_sv: List[Path] = [
36 CosimCollateralDir / "Cosim_DpiPkg.sv",
37 CosimCollateralDir / "Cosim_Endpoint.sv",
38 CosimCollateralDir / "Cosim_Manifest.sv",
39 ]
40 # Name of the top module.
41 self.top = top
42
43 def add_file(self, file: Path):
44 """Add a single RTL file to the source list."""
45 if file.is_file():
46 self.user.append(file)
47 else:
48 raise FileNotFoundError(f"File {file} does not exist")
49
50 def add_dir(self, dir: Path):
51 """Add all the RTL files in a directory to the source list."""
52 for file in sorted(dir.iterdir()):
53 if file.is_file() and (file.suffix == ".sv" or file.suffix == ".v"):
54 self.user.append(file)
55 elif file.is_dir():
56 self.add_dir(file)
57
58 def dpi_so_paths(self) -> List[Path]:
59 """Return a list of all the DPI shared object files."""
60
61 def find_so(name: str) -> Path:
62 for path in Simulator.get_env().get("LD_LIBRARY_PATH", "").split(":"):
63 if os.name == "nt":
64 so = Path(path) / f"{name}.dll"
65 else:
66 so = Path(path) / f"lib{name}.so"
67 if so.exists():
68 return so
69 raise FileNotFoundError(f"Could not find {name}.so in LD_LIBRARY_PATH")
70
71 return [find_so(name) for name in self.dpi_so]
72
73 @property
74 def rtl_sources(self) -> List[Path]:
75 """Return a list of all the RTL source files."""
76 return self.dpi_sv + self.user
77
78
80
81 def __init__(self,
82 proc: subprocess.Popen,
83 port: int,
84 threads: Optional[List[threading.Thread]] = None,
85 gui: bool = False):
86 self.proc = proc
87 self.port = port
88 self.threads: List[threading.Thread] = threads or []
89 self.gui = gui
90
91 def force_stop(self):
92 """Make sure to stop the simulation no matter what."""
93 if self.proc:
94 os.killpg(os.getpgid(self.proc.pid), signal.SIGINT)
95 # Allow the simulation time to flush its outputs.
96 try:
97 self.proc.wait(timeout=1.0)
98 except subprocess.TimeoutExpired:
99 # If the simulation doesn't exit of its own free will, kill it.
100 self.proc.kill()
101
102 # Join reader threads (they should exit once pipes are closed).
103 for t in self.threads:
104 t.join()
105
106
108
109 # Some RTL simulators don't use stderr for error messages. Everything goes to
110 # stdout. Boo! They should feel bad about this. Also, they can specify that
111 # broken behavior by overriding this.
112 UsesStderr = True
113
114 def __init__(self,
115 sources: SourceFiles,
116 run_dir: Path,
117 debug: bool,
118 run_stdout_callback: Optional[Callable[[str], None]] = None,
119 run_stderr_callback: Optional[Callable[[str], None]] = None,
120 compile_stdout_callback: Optional[Callable[[str], None]] = None,
121 compile_stderr_callback: Optional[Callable[[str], None]] = None,
122 make_default_logs: bool = True,
123 macro_definitions: Optional[Dict[str, str]] = None):
124 """Simulator base class.
125
126 Optional sinks can be provided for capturing output. If not provided,
127 the simulator will write to log files in `run_dir`.
128
129 Args:
130 sources: SourceFiles describing RTL/DPI inputs.
131 run_dir: Directory where build/run artifacts are placed.
132 debug: Enable cosim debug mode.
133 run_stdout_callback: Line-based callback for runtime stdout.
134 run_stderr_callback: Line-based callback for runtime stderr.
135 compile_stdout_callback: Line-based callback for compile stdout.
136 compile_stderr_callback: Line-based callback for compile stderr.
137 make_default_logs: If True and corresponding callback is not supplied,
138 create log file and emit via internally-created callback.
139 macro_definitions: Optional dictionary of macro definitions to be defined
140 during compilation.
141 """
142 self.sources = sources
143 self.run_dir = run_dir
144 self.debug = debug
145 self.macro_definitions = macro_definitions
146
147 # Unified list of any log file handles we opened.
148 self._default_files: List[IO[str]] = []
149
150 def _ensure_default(cb: Optional[Callable[[str], None]], filename: str):
151 """Return (callback, file_handle_or_None) with optional file creation.
152
153 Behavior:
154 * If a callback is provided, return it unchanged with no file.
155 * If no callback and make_default_logs is False, return (None, None).
156 * If no callback and make_default_logs is True, create a log file and
157 return a writer callback plus the opened file handle.
158 """
159 if cb is not None:
160 return cb, None
161 if not make_default_logs:
162 return None, None
163 p = self.run_dir / filename
164 p.parent.mkdir(parents=True, exist_ok=True)
165 logf = p.open("w+")
166 self._default_files.append(logf)
167
168 def _writer(line: str, _lf=logf):
169 _lf.write(line + "\n")
170 _lf.flush()
171
172 return _writer, logf
173
174 # Initialize all four (compile/run stdout/stderr) uniformly.
175 self._compile_stdout_cb, self._compile_stdout_log = _ensure_default(
176 compile_stdout_callback, 'compile_stdout.log')
177 self._compile_stderr_cb, self._compile_stderr_log = _ensure_default(
178 compile_stderr_callback, 'compile_stderr.log')
179 self._run_stdout_cb, self._run_stdout_log = _ensure_default(
180 run_stdout_callback, 'sim_stdout.log')
181 self._run_stderr_cb, self._run_stderr_log = _ensure_default(
182 run_stderr_callback, 'sim_stderr.log')
183
184 @staticmethod
185 def get_env() -> Dict[str, str]:
186 """Get the environment variables to locate shared objects."""
187
188 env = os.environ.copy()
189 env["LIBRARY_PATH"] = env.get("LIBRARY_PATH", "") + ":" + str(
190 _thisdir.parent / "lib")
191 env["LD_LIBRARY_PATH"] = env.get("LD_LIBRARY_PATH", "") + ":" + str(
192 _thisdir.parent / "lib")
193 return env
194
195 def compile_commands(self) -> List[List[str]]:
196 """Compile the sources. Returns the exit code of the simulation compiler."""
197 assert False, "Must be implemented by subclass"
198
199 def compile(self) -> int:
200 cmds = self.compile_commands()
201 self.run_dir.mkdir(parents=True, exist_ok=True)
202 for cmd in cmds:
204 cmd,
205 env=Simulator.get_env(),
206 cwd=None,
207 stdout_cb=self._compile_stdout_cb,
208 stderr_cb=self._compile_stderr_cb,
209 wait=True)
210 if isinstance(ret, int) and ret != 0:
211 print("====== Compilation failure")
212
213 # If we have the default file loggers, print the compilation logs to
214 # console. Else, assume that the user has already captured them.
215 if self.UsesStderr:
216 if self._compile_stderr_log is not None:
217 self._compile_stderr_log.seek(0)
218 print(self._compile_stderr_log.read())
219 else:
220 if self._compile_stdout_log is not None:
221 self._compile_stdout_log.seek(0)
222 print(self._compile_stdout_log.read())
223
224 return ret
225 return 0
226
227 def run_command(self, gui: bool) -> List[str]:
228 """Return the command to run the simulation."""
229 assert False, "Must be implemented by subclass"
230
231 def run_proc(self, gui: bool = False) -> SimProcess:
232 """Run the simulation process. Returns the Popen object and the port which
233 the simulation is listening on.
234
235 If user-provided stdout/stderr sinks were supplied in the constructor,
236 they are used. Otherwise, log files are created in `run_dir`.
237 """
238 self.run_dir.mkdir(parents=True, exist_ok=True)
239
240 env_gui = os.environ.get("COSIM_GUI", "0")
241 if env_gui != "0":
242 gui = True
243
244 # Erase the config file if it exists. We don't want to read
245 # an old config.
246 portFileName = self.run_dir / "cosim.cfg"
247 if os.path.exists(portFileName):
248 os.remove(portFileName)
249
250 # Run the simulation.
251 simEnv = Simulator.get_env()
252 if self.debug:
253 simEnv["COSIM_DEBUG_FILE"] = "cosim_debug.log"
254 if "DEBUG_PERIOD" not in simEnv:
255 # Slow the simulation down to one tick per millisecond.
256 simEnv["DEBUG_PERIOD"] = "1"
257 rcmd = self.run_command(gui)
258 # Start process with asynchronous output capture.
259 proc, threads = self._start_process_with_callbacks(
260 rcmd,
261 env=simEnv,
262 cwd=self.run_dir,
263 stdout_cb=self._run_stdout_cb,
264 stderr_cb=self._run_stderr_cb,
265 wait=False)
266
267 # Get the port which the simulation RPC selected.
268 checkCount = 0
269 while (not os.path.exists(portFileName)) and \
270 proc.poll() is None:
271 time.sleep(0.1)
272 checkCount += 1
273 if checkCount > 500 and not gui:
274 raise Exception(f"Cosim never wrote cfg file: {portFileName}")
275 port = -1
276 while port < 0:
277 portFile = open(portFileName, "r")
278 for line in portFile.readlines():
279 m = re.match("port: (\\d+)", line)
280 if m is not None:
281 port = int(m.group(1))
282 portFile.close()
283
284 # Wait for the simulation to start accepting RPC connections.
285 checkCount = 0
286 while not is_port_open(port):
287 checkCount += 1
288 if checkCount > 200:
289 raise Exception(f"Cosim RPC port ({port}) never opened")
290 if proc.poll() is not None:
291 raise Exception("Simulation exited early")
292 time.sleep(0.05)
293 return SimProcess(proc=proc, port=port, threads=threads, gui=gui)
294
296 self, cmd: List[str], env: Optional[Dict[str, str]], cwd: Optional[Path],
297 stdout_cb: Optional[Callable[[str],
298 None]], stderr_cb: Optional[Callable[[str],
299 None]],
300 wait: bool) -> int | tuple[subprocess.Popen, List[threading.Thread]]:
301 """Start a subprocess and stream its stdout/stderr to callbacks.
302
303 If wait is True, blocks until process completes and returns its exit code.
304 If wait is False, returns the Popen object (threads keep streaming).
305 """
306 if os.name == "posix":
307 proc = subprocess.Popen(cmd,
308 stdout=subprocess.PIPE,
309 stderr=subprocess.PIPE,
310 env=env,
311 cwd=cwd,
312 text=True,
313 preexec_fn=os.setsid)
314 else: # windows
315 proc = subprocess.Popen(cmd,
316 stdout=subprocess.PIPE,
317 stderr=subprocess.PIPE,
318 env=env,
319 cwd=cwd,
320 text=True,
321 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
322
323 def _reader(pipe, cb):
324 if pipe is None:
325 return
326 for raw in pipe:
327 if raw.endswith('\n'):
328 raw = raw[:-1]
329 if cb:
330 try:
331 cb(raw)
332 except Exception as e:
333 print(f"Exception in simulator output callback: {e}")
334
335 threads: List[threading.Thread] = [
336 threading.Thread(target=_reader,
337 args=(proc.stdout, stdout_cb),
338 daemon=True),
339 threading.Thread(target=_reader,
340 args=(proc.stderr, stderr_cb),
341 daemon=True),
342 ]
343 for t in threads:
344 t.start()
345 if wait:
346 for t in threads:
347 t.join()
348 return proc.wait()
349 return proc, threads
350
351 def run(self,
352 inner_command: str,
353 gui: bool = False,
354 server_only: bool = False) -> int:
355 """Start the simulation then run the command specified. Kill the simulation
356 when the command exits."""
357
358 # 'simProc' is accessed in the finally block. Declare it here to avoid
359 # syntax errors in that block.
360 simProc = None
361 try:
362 simProc = self.run_proc(gui=gui)
363 if server_only:
364 # wait for user input to kill the server
365 input(
366 f"Running in server-only mode on port {simProc.port} - Press anything to kill the server..."
367 )
368 return 0
369 else:
370 # Run the inner command, passing the connection info via environment vars.
371 testEnv = os.environ.copy()
372 testEnv["ESI_COSIM_PORT"] = str(simProc.port)
373 testEnv["ESI_COSIM_HOST"] = "localhost"
374 ret = subprocess.run(inner_command, cwd=os.getcwd(),
375 env=testEnv).returncode
376 if simProc.gui:
377 print("GUI mode - waiting for simulator to exit...")
378 simProc.proc.wait()
379 return ret
380 finally:
381 if simProc and simProc.proc.poll() is None:
382 simProc.force_stop()
static void print(TypedAttr val, llvm::raw_ostream &os)
static StringAttr append(StringAttr base, const Twine &suffix)
Return a attribute with the specified suffix appended.
__init__(self, subprocess.Popen proc, int port, Optional[List[threading.Thread]] threads=None, bool gui=False)
Definition simulator.py:85
int|tuple[subprocess.Popen, List[threading.Thread]] _start_process_with_callbacks(self, List[str] cmd, Optional[Dict[str, str]] env, Optional[Path] cwd, Optional[Callable[[str], None]] stdout_cb, Optional[Callable[[str], None]] stderr_cb, bool wait)
Definition simulator.py:300
int run(self, str inner_command, bool gui=False, bool server_only=False)
Definition simulator.py:354
__init__(self, SourceFiles sources, Path run_dir, bool debug, Optional[Callable[[str], None]] run_stdout_callback=None, Optional[Callable[[str], None]] run_stderr_callback=None, Optional[Callable[[str], None]] compile_stdout_callback=None, Optional[Callable[[str], None]] compile_stderr_callback=None, bool make_default_logs=True, Optional[Dict[str, str]] macro_definitions=None)
Definition simulator.py:123
List[List[str]] compile_commands(self)
Definition simulator.py:195
Dict[str, str] get_env()
Definition simulator.py:185
List[str] run_command(self, bool gui)
Definition simulator.py:227
SimProcess run_proc(self, bool gui=False)
Definition simulator.py:231
List[Path] rtl_sources(self)
Definition simulator.py:74
add_file(self, Path file)
Definition simulator.py:43
None __init__(self, str top)
Definition simulator.py:29
List[Path] dpi_so_paths(self)
Definition simulator.py:58
add_dir(self, Path dir)
Definition simulator.py:50
bool is_port_open(port)
Definition simulator.py:19