1# Copyright 2022 Google LLC.
2# SPDX-License-Identifier: Apache-2.0
3"""Module for job counters, limiting the amount of concurrent executions."""
4
5import fcntl
6import functools
7import logging
8import multiprocessing
9import os
10import re
11import select
12import selectors
13import subprocess
14import sys
15
16logger = logging.getLogger('twister')
17logger.setLevel(logging.DEBUG)
18
19class JobHandle:
20    """Small object to handle claim of a job."""
21
22    def __init__(self, release_func, *args, **kwargs):
23        self.release_func = release_func
24        self.args = args
25        self.kwargs = kwargs
26
27    def __enter__(self):
28        pass
29
30    def __exit__(self, exc_type, exc_value, traceback):
31        if self.release_func:
32            self.release_func(*self.args, **self.kwargs)
33
34
35class JobClient:
36    """Abstract base class for all job clients."""
37
38    def get_job(self):
39        """Claim a job."""
40        return JobHandle(None)
41
42    @staticmethod
43    def env():
44        """Get the environment variables necessary to share the job server."""
45        return {}
46
47    @staticmethod
48    def pass_fds():
49        """Returns the file descriptors that should be passed to subprocesses."""
50        return []
51
52    def popen(self, argv, **kwargs):
53        """Start a process using subprocess.Popen
54
55        All other arguments are passed to subprocess.Popen.
56
57        Returns:
58            A Popen object.
59        """
60        kwargs.setdefault("env", os.environ)
61        kwargs.setdefault("pass_fds", [])
62        kwargs["env"].update(self.env())
63        kwargs["pass_fds"] += self.pass_fds()
64
65        return subprocess.Popen(argv, **kwargs)
66
67
68class GNUMakeJobClient(JobClient):
69    """A job client for GNU make.
70
71    A client of jobserver is allowed to run 1 job without contacting the
72    jobserver, so maintain an optional self._internal_pipe to hold that
73    job.
74    """
75
76    def __init__(self, inheritable_pipe, jobs, internal_jobs=0, makeflags=None):
77        self._makeflags = makeflags
78        self._inheritable_pipe = inheritable_pipe
79        self.jobs = jobs
80        self._selector = selectors.DefaultSelector()
81        if internal_jobs:
82            self._internal_pipe = os.pipe()
83            os.write(self._internal_pipe[1], b"+" * internal_jobs)
84            os.set_blocking(self._internal_pipe[0], False)
85            self._selector.register(
86                self._internal_pipe[0],
87                selectors.EVENT_READ,
88                self._internal_pipe[1],
89            )
90        else:
91            self._internal_pipe = None
92        if self._inheritable_pipe is not None:
93            os.set_blocking(self._inheritable_pipe[0], False)
94            self._selector.register(
95                self._inheritable_pipe[0],
96                selectors.EVENT_READ,
97                self._inheritable_pipe[1],
98            )
99
100    def __del__(self):
101        if self._inheritable_pipe:
102            os.close(self._inheritable_pipe[0])
103            os.close(self._inheritable_pipe[1])
104        if self._internal_pipe:
105            os.close(self._internal_pipe[0])
106            os.close(self._internal_pipe[1])
107
108    @classmethod
109    def from_environ(cls, env=None, jobs=0):
110        """Create a job client from an environment with the MAKEFLAGS variable.
111
112        If we are started under a GNU Make Job Server, we can search
113        the environment for a string "--jobserver-auth=R,W", where R
114        and W will be the read and write file descriptors to the pipe
115        respectively.  If we don't find this environment variable (or
116        the string inside of it), this will raise an OSError.
117
118        The specification for MAKEFLAGS is:
119          * If the first char is "n", this is a dry run, just exit.
120          * If the flags contains -j1, go to sequential mode.
121          * If the flags contains --jobserver-auth=R,W AND those file
122            descriptors are valid, use the jobserver. Otherwise output a
123            warning.
124
125        Args:
126            env: Optionally, the environment to search.
127            jobs: The number of jobs set by the user on the command line.
128
129        Returns:
130            A GNUMakeJobClient configured appropriately or None if there is
131            no MAKEFLAGS environment variable.
132        """
133        if env is None:
134            env = os.environ
135        makeflags = env.get("MAKEFLAGS")
136        if not makeflags:
137            return None
138        match = re.search(r"--jobserver-auth=(\d+),(\d+)", makeflags)
139        if match:
140            pipe = [int(x) for x in match.groups()]
141            if jobs:
142                pipe = None
143                logger.warning(
144                    "-jN forced on command line; ignoring GNU make jobserver"
145                )
146            else:
147                try:
148                    # Use F_GETFL to see if file descriptors are valid
149                    if pipe:
150                        rc = fcntl.fcntl(pipe[0], fcntl.F_GETFL)
151                        if rc & os.O_ACCMODE != os.O_RDONLY:
152                            logger.warning(
153                                f"FD {pipe[0]} is not readable (flags={rc:x});"
154                                " ignoring GNU make jobserver"
155                            )
156                            pipe = None
157                    if pipe:
158                        rc = fcntl.fcntl(pipe[1], fcntl.F_GETFL)
159                        if rc & os.O_ACCMODE != os.O_WRONLY:
160                            logger.warning(
161                                f"FD {pipe[1]} is not writable (flags={rc:x});"
162                                " ignoring GNU make jobserver"
163                            )
164                            pipe = None
165                    if pipe:
166                        logger.info("using GNU make jobserver")
167                except OSError:
168                    pipe = None
169                    logger.warning(
170                        "No file descriptors; ignoring GNU make jobserver"
171                    )
172        else:
173            pipe = None
174        if not jobs:
175            match = re.search(r"-j(\d+)", makeflags)
176            if match:
177                jobs = int(match.group(1))
178                if jobs == 1:
179                    logger.info("Running in sequential mode (-j1)")
180        if makeflags[0] == "n":
181            logger.info("MAKEFLAGS contained dry-run flag")
182            sys.exit(0)
183        return cls(pipe, jobs, internal_jobs=1, makeflags=makeflags)
184
185    def get_job(self):
186        """Claim a job.
187
188        Returns:
189            A JobHandle object.
190        """
191        while True:
192            ready_items = self._selector.select()
193            if len(ready_items) > 0:
194                read_fd = ready_items[0][0].fd
195                write_fd = ready_items[0][0].data
196                try:
197                    byte = os.read(read_fd, 1)
198                    return JobHandle(
199                        functools.partial(os.write, write_fd, byte)
200                    )
201                except BlockingIOError:
202                    pass
203
204    def env(self):
205        """Get the environment variables necessary to share the job server."""
206        if self._makeflags:
207            return {"MAKEFLAGS": self._makeflags}
208        flag = ""
209        if self.jobs:
210            flag += f" -j{self.jobs}"
211        if self.jobs != 1 and self._inheritable_pipe is not None:
212            flag += f" --jobserver-auth={self._inheritable_pipe[0]},{self._inheritable_pipe[1]}"
213        return {"MAKEFLAGS": flag}
214
215    def pass_fds(self):
216        """Returns the file descriptors that should be passed to subprocesses."""
217        if self.jobs != 1 and self._inheritable_pipe is not None:
218            return self._inheritable_pipe
219        return []
220
221
222class GNUMakeJobServer(GNUMakeJobClient):
223    """Implements a GNU Make POSIX Job Server.
224
225    See https://www.gnu.org/software/make/manual/html_node/POSIX-Jobserver.html
226    for specification.
227    """
228
229    def __init__(self, jobs=0):
230        if not jobs:
231            jobs = multiprocessing.cpu_count()
232        elif jobs > select.PIPE_BUF:
233            jobs = select.PIPE_BUF
234        super().__init__(os.pipe(), jobs)
235
236        os.write(self._inheritable_pipe[1], b"+" * jobs)
237