1# Copyright 2022 Google LLC.
2# SPDX-License-Identifier: Apache-2.0
3"""Module for job counters, limiting the amount of concurrent executions."""
5import fcntl
6import functools
7import logging
8import multiprocessing
9import os
10import re
11import select
12import selectors
13import subprocess
14import sys
16logger = logging.getLogger('twister')
19class JobHandle:
20    """Small object to handle claim of a job."""
22    def __init__(self, release_func, *args, **kwargs):
23        self.release_func = release_func
24        self.args = args
25        self.kwargs = kwargs
27    def __enter__(self):
28        pass
30    def __exit__(self, exc_type, exc_value, traceback):
31        if self.release_func:
32            self.release_func(*self.args, **self.kwargs)
35class JobClient:
36    """Abstract base class for all job clients."""
38    def get_job(self):
39        """Claim a job."""
40        return JobHandle(None)
42    @staticmethod
43    def env():
44        """Get the environment variables necessary to share the job server."""
45        return {}
47    @staticmethod
48    def pass_fds():
49        """Returns the file descriptors that should be passed to subprocesses."""
50        return []
52    def popen(self, argv, **kwargs):
53        """Start a process using subprocess.Popen
55        All other arguments are passed to subprocess.Popen.
57        Returns:
58            A Popen object.
59        """
60        kwargs.setdefault("env", os.environ)
61        kwargs.setdefault("pass_fds", [])
62        kwargs["env"].update(self.env())
63        kwargs["pass_fds"] += self.pass_fds()
65        return subprocess.Popen(  # pylint:disable=consider-using-with
66            argv, **kwargs
67        )
70class GNUMakeJobClient(JobClient):
71    """A job client for GNU make.
73    A client of jobserver is allowed to run 1 job without contacting the
74    jobserver, so maintain an optional self._internal_pipe to hold that
75    job.
76    """
78    def __init__(self, inheritable_pipe, jobs, internal_jobs=0, makeflags=None):
79        self._makeflags = makeflags
80        self._inheritable_pipe = inheritable_pipe
81        self.jobs = jobs
82        self._selector = selectors.DefaultSelector()
83        if internal_jobs:
84            self._internal_pipe = os.pipe()
85            os.write(self._internal_pipe[1], b"+" * internal_jobs)
86            os.set_blocking(self._internal_pipe[0], False)
87            self._selector.register(
88                self._internal_pipe[0],
89                selectors.EVENT_READ,
90                self._internal_pipe[1],
91            )
92        else:
93            self._internal_pipe = None
94        if self._inheritable_pipe is not None:
95            os.set_blocking(self._inheritable_pipe[0], False)
96            self._selector.register(
97                self._inheritable_pipe[0],
98                selectors.EVENT_READ,
99                self._inheritable_pipe[1],
100            )
102    def __del__(self):
103        if self._inheritable_pipe:
104            os.close(self._inheritable_pipe[0])
105            os.close(self._inheritable_pipe[1])
106        if self._internal_pipe:
107            os.close(self._internal_pipe[0])
108            os.close(self._internal_pipe[1])
110    @classmethod
111    def from_environ(cls, env=None, jobs=0):
112        """Create a job client from an environment with the MAKEFLAGS variable.
114        If we are started under a GNU Make Job Server, we can search
115        the environment for a string "--jobserver-auth=R,W", where R
116        and W will be the read and write file descriptors to the pipe
117        respectively.  If we don't find this environment variable (or
118        the string inside of it), this will raise an OSError.
120        The specification for MAKEFLAGS is:
121          * If the first char is "n", this is a dry run, just exit.
122          * If the flags contains -j1, go to sequential mode.
123          * If the flags contains --jobserver-auth=R,W AND those file
124            descriptors are valid, use the jobserver. Otherwise output a
125            warning.
127        Args:
128            env: Optionally, the environment to search.
129            jobs: The number of jobs set by the user on the command line.
131        Returns:
132            A GNUMakeJobClient configured appropriately or None if there is
133            no MAKEFLAGS environment variable.
134        """
135        if env is None:
136            env = os.environ
137        makeflags = env.get("MAKEFLAGS")
138        if not makeflags:
139            return None
140        match = re.search(r"--jobserver-auth=(\d+),(\d+)", makeflags)
141        if match:
142            pipe = [int(x) for x in match.groups()]
143            if jobs:
144                pipe = None
145                logger.warning(
146                    "-jN forced on command line; ignoring GNU make jobserver"
147                )
148            else:
149                try:
150                    # Use F_GETFL to see if file descriptors are valid
151                    if pipe:
152                        rc = fcntl.fcntl(pipe[0], fcntl.F_GETFL)
153                        if not rc & os.O_ACCMODE == os.O_RDONLY:
154                            logger.warning(
155                                "FD %s is not readable (flags=%x); "
156                                "ignoring GNU make jobserver", pipe[0], rc)
157                            pipe = None
158                    if pipe:
159                        rc = fcntl.fcntl(pipe[1], fcntl.F_GETFL)
160                        if not rc & os.O_ACCMODE == os.O_WRONLY:
161                            logger.warning(
162                                "FD %s is not writable (flags=%x); "
163                                "ignoring GNU make jobserver", pipe[1], rc)
164                            pipe = None
165                    if pipe:
166                        logger.info("using GNU make jobserver")
167                except OSError:
168                    pipe = None
169                    logger.warning(
170                        "No file descriptors; ignoring GNU make jobserver"
171                    )
172        else:
173            pipe = None
174        if not jobs:
175            match = re.search(r"-j(\d+)", makeflags)
176            if match:
177                jobs = int(match.group(1))
178                if jobs == 1:
179                    logger.info("Running in sequential mode (-j1)")
180        if makeflags[0] == "n":
181            logger.info("MAKEFLAGS contained dry-run flag")
182            sys.exit(0)
183        return cls(pipe, jobs, internal_jobs=1, makeflags=makeflags)
185    def get_job(self):
186        """Claim a job.
188        Returns:
189            A JobHandle object.
190        """
191        while True:
192            ready_items = self._selector.select()
193            if len(ready_items) > 0:
194                read_fd = ready_items[0][0].fd
195                write_fd = ready_items[0][0].data
196                try:
197                    byte = os.read(read_fd, 1)
198                    return JobHandle(
199                        functools.partial(os.write, write_fd, byte)
200                    )
201                except BlockingIOError:
202                    pass
204    def env(self):
205        """Get the environment variables necessary to share the job server."""
206        if self._makeflags:
207            return {"MAKEFLAGS": self._makeflags}
208        flag = ""
209        if self.jobs:
210            flag += f" -j{self.jobs}"
211        if self.jobs != 1 and self._inheritable_pipe is not None:
212            flag += f" --jobserver-auth={self._inheritable_pipe[0]},{self._inheritable_pipe[1]}"
213        return {"MAKEFLAGS": flag}
215    def pass_fds(self):
216        """Returns the file descriptors that should be passed to subprocesses."""
217        if self.jobs != 1 and self._inheritable_pipe is not None:
218            return self._inheritable_pipe
219        return []
222class GNUMakeJobServer(GNUMakeJobClient):
223    """Implements a GNU Make POSIX Job Server.
225    See https://www.gnu.org/software/make/manual/html_node/POSIX-Jobserver.html
226    for specification.
227    """
229    def __init__(self, jobs=0):
230        if not jobs:
231            jobs = multiprocessing.cpu_count()
232        elif jobs > select.PIPE_BUF:
233            jobs = select.PIPE_BUF
234        super().__init__(os.pipe(), jobs)
236        os.write(self._inheritable_pipe[1], b"+" * jobs)