1# Copyright 2022 Google LLC. 2# SPDX-License-Identifier: Apache-2.0 3"""Module for job counters, limiting the amount of concurrent executions.""" 4 5import fcntl 6import functools 7import logging 8import multiprocessing 9import os 10import re 11import select 12import selectors 13import subprocess 14import sys 15 16logger = logging.getLogger('twister') 17logger.setLevel(logging.DEBUG) 18 19class JobHandle: 20 """Small object to handle claim of a job.""" 21 22 def __init__(self, release_func, *args, **kwargs): 23 self.release_func = release_func 24 self.args = args 25 self.kwargs = kwargs 26 27 def __enter__(self): 28 pass 29 30 def __exit__(self, exc_type, exc_value, traceback): 31 if self.release_func: 32 self.release_func(*self.args, **self.kwargs) 33 34 35class JobClient: 36 """Abstract base class for all job clients.""" 37 38 def get_job(self): 39 """Claim a job.""" 40 return JobHandle(None) 41 42 @staticmethod 43 def env(): 44 """Get the environment variables necessary to share the job server.""" 45 return {} 46 47 @staticmethod 48 def pass_fds(): 49 """Returns the file descriptors that should be passed to subprocesses.""" 50 return [] 51 52 def popen(self, argv, **kwargs): 53 """Start a process using subprocess.Popen 54 55 All other arguments are passed to subprocess.Popen. 56 57 Returns: 58 A Popen object. 59 """ 60 kwargs.setdefault("env", os.environ) 61 kwargs.setdefault("pass_fds", []) 62 kwargs["env"].update(self.env()) 63 kwargs["pass_fds"] += self.pass_fds() 64 65 return subprocess.Popen( # pylint:disable=consider-using-with 66 argv, **kwargs 67 ) 68 69 70class GNUMakeJobClient(JobClient): 71 """A job client for GNU make. 72 73 A client of jobserver is allowed to run 1 job without contacting the 74 jobserver, so maintain an optional self._internal_pipe to hold that 75 job. 76 """ 77 78 def __init__(self, inheritable_pipe, jobs, internal_jobs=0, makeflags=None): 79 self._makeflags = makeflags 80 self._inheritable_pipe = inheritable_pipe 81 self.jobs = jobs 82 self._selector = selectors.DefaultSelector() 83 if internal_jobs: 84 self._internal_pipe = os.pipe() 85 os.write(self._internal_pipe[1], b"+" * internal_jobs) 86 os.set_blocking(self._internal_pipe[0], False) 87 self._selector.register( 88 self._internal_pipe[0], 89 selectors.EVENT_READ, 90 self._internal_pipe[1], 91 ) 92 else: 93 self._internal_pipe = None 94 if self._inheritable_pipe is not None: 95 os.set_blocking(self._inheritable_pipe[0], False) 96 self._selector.register( 97 self._inheritable_pipe[0], 98 selectors.EVENT_READ, 99 self._inheritable_pipe[1], 100 ) 101 102 def __del__(self): 103 if self._inheritable_pipe: 104 os.close(self._inheritable_pipe[0]) 105 os.close(self._inheritable_pipe[1]) 106 if self._internal_pipe: 107 os.close(self._internal_pipe[0]) 108 os.close(self._internal_pipe[1]) 109 110 @classmethod 111 def from_environ(cls, env=None, jobs=0): 112 """Create a job client from an environment with the MAKEFLAGS variable. 113 114 If we are started under a GNU Make Job Server, we can search 115 the environment for a string "--jobserver-auth=R,W", where R 116 and W will be the read and write file descriptors to the pipe 117 respectively. If we don't find this environment variable (or 118 the string inside of it), this will raise an OSError. 119 120 The specification for MAKEFLAGS is: 121 * If the first char is "n", this is a dry run, just exit. 122 * If the flags contains -j1, go to sequential mode. 123 * If the flags contains --jobserver-auth=R,W AND those file 124 descriptors are valid, use the jobserver. Otherwise output a 125 warning. 126 127 Args: 128 env: Optionally, the environment to search. 129 jobs: The number of jobs set by the user on the command line. 130 131 Returns: 132 A GNUMakeJobClient configured appropriately or None if there is 133 no MAKEFLAGS environment variable. 134 """ 135 if env is None: 136 env = os.environ 137 makeflags = env.get("MAKEFLAGS") 138 if not makeflags: 139 return None 140 match = re.search(r"--jobserver-auth=(\d+),(\d+)", makeflags) 141 if match: 142 pipe = [int(x) for x in match.groups()] 143 if jobs: 144 pipe = None 145 logger.warning( 146 "-jN forced on command line; ignoring GNU make jobserver" 147 ) 148 else: 149 try: 150 # Use F_GETFL to see if file descriptors are valid 151 if pipe: 152 rc = fcntl.fcntl(pipe[0], fcntl.F_GETFL) 153 if not rc & os.O_ACCMODE == os.O_RDONLY: 154 logger.warning( 155 "FD %s is not readable (flags=%x); " 156 "ignoring GNU make jobserver", pipe[0], rc) 157 pipe = None 158 if pipe: 159 rc = fcntl.fcntl(pipe[1], fcntl.F_GETFL) 160 if not rc & os.O_ACCMODE == os.O_WRONLY: 161 logger.warning( 162 "FD %s is not writable (flags=%x); " 163 "ignoring GNU make jobserver", pipe[1], rc) 164 pipe = None 165 if pipe: 166 logger.info("using GNU make jobserver") 167 except OSError: 168 pipe = None 169 logger.warning( 170 "No file descriptors; ignoring GNU make jobserver" 171 ) 172 else: 173 pipe = None 174 if not jobs: 175 match = re.search(r"-j(\d+)", makeflags) 176 if match: 177 jobs = int(match.group(1)) 178 if jobs == 1: 179 logger.info("Running in sequential mode (-j1)") 180 if makeflags[0] == "n": 181 logger.info("MAKEFLAGS contained dry-run flag") 182 sys.exit(0) 183 return cls(pipe, jobs, internal_jobs=1, makeflags=makeflags) 184 185 def get_job(self): 186 """Claim a job. 187 188 Returns: 189 A JobHandle object. 190 """ 191 while True: 192 ready_items = self._selector.select() 193 if len(ready_items) > 0: 194 read_fd = ready_items[0][0].fd 195 write_fd = ready_items[0][0].data 196 try: 197 byte = os.read(read_fd, 1) 198 return JobHandle( 199 functools.partial(os.write, write_fd, byte) 200 ) 201 except BlockingIOError: 202 pass 203 204 def env(self): 205 """Get the environment variables necessary to share the job server.""" 206 if self._makeflags: 207 return {"MAKEFLAGS": self._makeflags} 208 flag = "" 209 if self.jobs: 210 flag += f" -j{self.jobs}" 211 if self.jobs != 1 and self._inheritable_pipe is not None: 212 flag += f" --jobserver-auth={self._inheritable_pipe[0]},{self._inheritable_pipe[1]}" 213 return {"MAKEFLAGS": flag} 214 215 def pass_fds(self): 216 """Returns the file descriptors that should be passed to subprocesses.""" 217 if self.jobs != 1 and self._inheritable_pipe is not None: 218 return self._inheritable_pipe 219 return [] 220 221 222class GNUMakeJobServer(GNUMakeJobClient): 223 """Implements a GNU Make POSIX Job Server. 224 225 See https://www.gnu.org/software/make/manual/html_node/POSIX-Jobserver.html 226 for specification. 227 """ 228 229 def __init__(self, jobs=0): 230 if not jobs: 231 jobs = multiprocessing.cpu_count() 232 elif jobs > select.PIPE_BUF: 233 jobs = select.PIPE_BUF 234 super().__init__(os.pipe(), jobs) 235 236 os.write(self._inheritable_pipe[1], b"+" * jobs) 237