1# Copyright 2022 Google LLC. 2# SPDX-License-Identifier: Apache-2.0 3"""Module for job counters, limiting the amount of concurrent executions.""" 4 5import fcntl 6import functools 7import logging 8import multiprocessing 9import os 10import re 11import select 12import selectors 13import subprocess 14import sys 15 16logger = logging.getLogger('twister') 17logger.setLevel(logging.DEBUG) 18 19class JobHandle: 20 """Small object to handle claim of a job.""" 21 22 def __init__(self, release_func, *args, **kwargs): 23 self.release_func = release_func 24 self.args = args 25 self.kwargs = kwargs 26 27 def __enter__(self): 28 pass 29 30 def __exit__(self, exc_type, exc_value, traceback): 31 if self.release_func: 32 self.release_func(*self.args, **self.kwargs) 33 34 35class JobClient: 36 """Abstract base class for all job clients.""" 37 38 def get_job(self): 39 """Claim a job.""" 40 return JobHandle(None) 41 42 @staticmethod 43 def env(): 44 """Get the environment variables necessary to share the job server.""" 45 return {} 46 47 @staticmethod 48 def pass_fds(): 49 """Returns the file descriptors that should be passed to subprocesses.""" 50 return [] 51 52 def popen(self, argv, **kwargs): 53 """Start a process using subprocess.Popen 54 55 All other arguments are passed to subprocess.Popen. 56 57 Returns: 58 A Popen object. 59 """ 60 kwargs.setdefault("env", os.environ) 61 kwargs.setdefault("pass_fds", []) 62 kwargs["env"].update(self.env()) 63 kwargs["pass_fds"] += self.pass_fds() 64 65 return subprocess.Popen(argv, **kwargs) 66 67 68class GNUMakeJobClient(JobClient): 69 """A job client for GNU make. 70 71 A client of jobserver is allowed to run 1 job without contacting the 72 jobserver, so maintain an optional self._internal_pipe to hold that 73 job. 74 """ 75 76 def __init__(self, inheritable_pipe, jobs, internal_jobs=0, makeflags=None): 77 self._makeflags = makeflags 78 self._inheritable_pipe = inheritable_pipe 79 self.jobs = jobs 80 self._selector = selectors.DefaultSelector() 81 if internal_jobs: 82 self._internal_pipe = os.pipe() 83 os.write(self._internal_pipe[1], b"+" * internal_jobs) 84 os.set_blocking(self._internal_pipe[0], False) 85 self._selector.register( 86 self._internal_pipe[0], 87 selectors.EVENT_READ, 88 self._internal_pipe[1], 89 ) 90 else: 91 self._internal_pipe = None 92 if self._inheritable_pipe is not None: 93 os.set_blocking(self._inheritable_pipe[0], False) 94 self._selector.register( 95 self._inheritable_pipe[0], 96 selectors.EVENT_READ, 97 self._inheritable_pipe[1], 98 ) 99 100 def __del__(self): 101 if self._inheritable_pipe: 102 os.close(self._inheritable_pipe[0]) 103 os.close(self._inheritable_pipe[1]) 104 if self._internal_pipe: 105 os.close(self._internal_pipe[0]) 106 os.close(self._internal_pipe[1]) 107 108 @classmethod 109 def from_environ(cls, env=None, jobs=0): 110 """Create a job client from an environment with the MAKEFLAGS variable. 111 112 If we are started under a GNU Make Job Server, we can search 113 the environment for a string "--jobserver-auth=R,W", where R 114 and W will be the read and write file descriptors to the pipe 115 respectively. If we don't find this environment variable (or 116 the string inside of it), this will raise an OSError. 117 118 The specification for MAKEFLAGS is: 119 * If the first char is "n", this is a dry run, just exit. 120 * If the flags contains -j1, go to sequential mode. 121 * If the flags contains --jobserver-auth=R,W AND those file 122 descriptors are valid, use the jobserver. Otherwise output a 123 warning. 124 125 Args: 126 env: Optionally, the environment to search. 127 jobs: The number of jobs set by the user on the command line. 128 129 Returns: 130 A GNUMakeJobClient configured appropriately or None if there is 131 no MAKEFLAGS environment variable. 132 """ 133 if env is None: 134 env = os.environ 135 makeflags = env.get("MAKEFLAGS") 136 if not makeflags: 137 return None 138 match = re.search(r"--jobserver-auth=(\d+),(\d+)", makeflags) 139 if match: 140 pipe = [int(x) for x in match.groups()] 141 if jobs: 142 pipe = None 143 logger.warning( 144 "-jN forced on command line; ignoring GNU make jobserver" 145 ) 146 else: 147 try: 148 # Use F_GETFL to see if file descriptors are valid 149 if pipe: 150 rc = fcntl.fcntl(pipe[0], fcntl.F_GETFL) 151 if rc & os.O_ACCMODE != os.O_RDONLY: 152 logger.warning( 153 f"FD {pipe[0]} is not readable (flags={rc:x});" 154 " ignoring GNU make jobserver" 155 ) 156 pipe = None 157 if pipe: 158 rc = fcntl.fcntl(pipe[1], fcntl.F_GETFL) 159 if rc & os.O_ACCMODE != os.O_WRONLY: 160 logger.warning( 161 f"FD {pipe[1]} is not writable (flags={rc:x});" 162 " ignoring GNU make jobserver" 163 ) 164 pipe = None 165 if pipe: 166 logger.info("using GNU make jobserver") 167 except OSError: 168 pipe = None 169 logger.warning( 170 "No file descriptors; ignoring GNU make jobserver" 171 ) 172 else: 173 pipe = None 174 if not jobs: 175 match = re.search(r"-j(\d+)", makeflags) 176 if match: 177 jobs = int(match.group(1)) 178 if jobs == 1: 179 logger.info("Running in sequential mode (-j1)") 180 if makeflags[0] == "n": 181 logger.info("MAKEFLAGS contained dry-run flag") 182 sys.exit(0) 183 return cls(pipe, jobs, internal_jobs=1, makeflags=makeflags) 184 185 def get_job(self): 186 """Claim a job. 187 188 Returns: 189 A JobHandle object. 190 """ 191 while True: 192 ready_items = self._selector.select() 193 if len(ready_items) > 0: 194 read_fd = ready_items[0][0].fd 195 write_fd = ready_items[0][0].data 196 try: 197 byte = os.read(read_fd, 1) 198 return JobHandle( 199 functools.partial(os.write, write_fd, byte) 200 ) 201 except BlockingIOError: 202 pass 203 204 def env(self): 205 """Get the environment variables necessary to share the job server.""" 206 if self._makeflags: 207 return {"MAKEFLAGS": self._makeflags} 208 flag = "" 209 if self.jobs: 210 flag += f" -j{self.jobs}" 211 if self.jobs != 1 and self._inheritable_pipe is not None: 212 flag += f" --jobserver-auth={self._inheritable_pipe[0]},{self._inheritable_pipe[1]}" 213 return {"MAKEFLAGS": flag} 214 215 def pass_fds(self): 216 """Returns the file descriptors that should be passed to subprocesses.""" 217 if self.jobs != 1 and self._inheritable_pipe is not None: 218 return self._inheritable_pipe 219 return [] 220 221 222class GNUMakeJobServer(GNUMakeJobClient): 223 """Implements a GNU Make POSIX Job Server. 224 225 See https://www.gnu.org/software/make/manual/html_node/POSIX-Jobserver.html 226 for specification. 227 """ 228 229 def __init__(self, jobs=0): 230 if not jobs: 231 jobs = multiprocessing.cpu_count() 232 elif jobs > select.PIPE_BUF: 233 jobs = select.PIPE_BUF 234 super().__init__(os.pipe(), jobs) 235 236 os.write(self._inheritable_pipe[1], b"+" * jobs) 237