maint: update copyright
[gnulib.git] / lib / pipe-filter-ii.c
1 /* Filtering of data through a subprocess.
2    Copyright (C) 2001-2003, 2008-2014 Free Software Foundation, Inc.
3    Written by Bruno Haible <bruno@clisp.org>, 2009.
4
5    This program is free software: you can redistribute it and/or modify
6    it under the terms of the GNU General Public License as published by
7    the Free Software Foundation; either version 3 of the License, or
8    (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful,
11    but WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13    GNU General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program.  If not, see <http://www.gnu.org/licenses/>.  */
17
18 #include <config.h>
19
20 #include "pipe-filter.h"
21
22 #include <errno.h>
23 #include <fcntl.h>
24 #include <stdbool.h>
25 #include <stdint.h>
26 #include <stdlib.h>
27 #include <unistd.h>
28 #if (defined _WIN32 || defined __WIN32__) && ! defined __CYGWIN__
29 # include <windows.h>
30 #else
31 # include <signal.h>
32 # include <sys/select.h>
33 #endif
34
35 #include "error.h"
36 #include "spawn-pipe.h"
37 #include "wait-process.h"
38 #include "gettext.h"
39
40 #define _(str) gettext (str)
41
42 #include "pipe-filter-aux.h"
43
44 #if (defined _WIN32 || defined __WIN32__) && ! defined __CYGWIN__
45
46 struct locals
47 {
48   /* Arguments passed to pipe_filter_ii_execute.  */
49   prepare_write_fn prepare_write;
50   done_write_fn done_write;
51   prepare_read_fn prepare_read;
52   done_read_fn done_read;
53
54   /* Management of the subprocess.  */
55   void *private_data;
56   int fd[2];
57
58   /* Status of the writer part.  */
59   volatile bool writer_terminated;
60   volatile int writer_errno;
61   /* Status of the reader part.  */
62   volatile bool reader_terminated;
63   volatile int reader_errno;
64 };
65
66 static unsigned int WINAPI
67 writer_thread_func (void *thread_arg)
68 {
69   struct locals *l = (struct locals *) thread_arg;
70
71   for (;;)
72     {
73       size_t bufsize;
74       const void *buf = l->prepare_write (&bufsize, l->private_data);
75       if (buf != NULL)
76         {
77           ssize_t nwritten =
78             write (l->fd[1], buf, bufsize > SSIZE_MAX ? SSIZE_MAX : bufsize);
79           if (nwritten < 0)
80             {
81               /* Don't assume that the gnulib modules 'write' and 'sigpipe' are
82                  used.  */
83               if (GetLastError () == ERROR_NO_DATA)
84                 errno = EPIPE;
85               l->writer_errno = errno;
86               break;
87             }
88           else if (nwritten > 0)
89             l->done_write ((void *) buf, nwritten, l->private_data);
90         }
91       else
92         break;
93     }
94
95   l->writer_terminated = true;
96   _endthreadex (0); /* calls ExitThread (0) */
97   abort ();
98 }
99
100 static unsigned int WINAPI
101 reader_thread_func (void *thread_arg)
102 {
103   struct locals *l = (struct locals *) thread_arg;
104
105   for (;;)
106     {
107       size_t bufsize;
108       void *buf = l->prepare_read (&bufsize, l->private_data);
109       if (!(buf != NULL && bufsize > 0))
110         /* prepare_read returned wrong values.  */
111         abort ();
112       {
113         ssize_t nread =
114           read (l->fd[0], buf, bufsize > SSIZE_MAX ? SSIZE_MAX : bufsize);
115         if (nread < 0)
116           {
117             l->reader_errno = errno;
118             break;
119           }
120         else if (nread > 0)
121           l->done_read (buf, nread, l->private_data);
122         else /* nread == 0 */
123           break;
124       }
125     }
126
127   l->reader_terminated = true;
128   _endthreadex (0); /* calls ExitThread (0) */
129   abort ();
130 }
131
132 #endif
133
134 int
135 pipe_filter_ii_execute (const char *progname,
136                         const char *prog_path, const char **prog_argv,
137                         bool null_stderr, bool exit_on_error,
138                         prepare_write_fn prepare_write,
139                         done_write_fn done_write,
140                         prepare_read_fn prepare_read,
141                         done_read_fn done_read,
142                         void *private_data)
143 {
144   pid_t child;
145   int fd[2];
146 #if !((defined _WIN32 || defined __WIN32__) && ! defined __CYGWIN__)
147   struct sigaction orig_sigpipe_action;
148 #endif
149
150   /* Open a bidirectional pipe to a subprocess.  */
151   child = create_pipe_bidi (progname, prog_path, (char **) prog_argv,
152                             null_stderr, true, exit_on_error,
153                             fd);
154   if (child == -1)
155     return -1;
156
157 #if (defined _WIN32 || defined __WIN32__) && ! defined __CYGWIN__
158   /* Native Windows API.  */
159   /* Pipes have a non-blocking mode, see function SetNamedPipeHandleState and
160      the article "Named Pipe Type, Read, and Wait Modes", but Microsoft's
161      documentation discourages its use.  So don't use it.
162      Asynchronous I/O is also not suitable because it notifies the caller only
163      about completion of the I/O request, not about intermediate progress.
164      So do the writing and the reading in separate threads.  */
165   {
166     struct locals l;
167     HANDLE handles[2];
168     #define writer_thread_handle handles[0]
169     #define reader_thread_handle handles[1]
170     bool writer_cleaned_up;
171     bool reader_cleaned_up;
172
173     l.prepare_write = prepare_write;
174     l.done_write = done_write;
175     l.prepare_read = prepare_read;
176     l.done_read = done_read;
177     l.private_data = private_data;
178     l.fd[0] = fd[0];
179     l.fd[1] = fd[1];
180     l.writer_terminated = false;
181     l.writer_errno = 0;
182     l.reader_terminated = false;
183     l.reader_errno = 0;
184
185     writer_thread_handle =
186       (HANDLE) _beginthreadex (NULL, 100000, writer_thread_func, &l, 0, NULL);
187     reader_thread_handle =
188       (HANDLE) _beginthreadex (NULL, 100000, reader_thread_func, &l, 0, NULL);
189     if (writer_thread_handle == NULL || reader_thread_handle == NULL)
190       {
191         if (exit_on_error)
192           error (EXIT_FAILURE, 0, _("creation of threads failed"));
193         if (reader_thread_handle != NULL)
194           CloseHandle (reader_thread_handle);
195         if (writer_thread_handle != NULL)
196           CloseHandle (writer_thread_handle);
197         goto fail;
198       }
199     writer_cleaned_up = false;
200     reader_cleaned_up = false;
201     for (;;)
202       {
203         DWORD ret;
204
205         /* Here !(writer_cleaned_up && reader_cleaned_up).  */
206         if (writer_cleaned_up)
207           ret = WaitForSingleObject (reader_thread_handle, INFINITE);
208         else if (reader_cleaned_up)
209           ret = WaitForSingleObject (writer_thread_handle, INFINITE);
210         else
211           ret = WaitForMultipleObjects (2, handles, FALSE, INFINITE);
212         if (!(ret == WAIT_OBJECT_0 + 0 || ret == WAIT_OBJECT_0 + 1))
213           abort ();
214
215         if (l.writer_terminated)
216           {
217             /* The writer thread has just terminated.  */
218             l.writer_terminated = false;
219             CloseHandle (writer_thread_handle);
220             if (l.writer_errno)
221               {
222                 if (exit_on_error)
223                   error (EXIT_FAILURE, l.writer_errno,
224                          _("write to %s subprocess failed"), progname);
225                 if (!reader_cleaned_up)
226                   {
227                     TerminateThread (reader_thread_handle, 1);
228                     CloseHandle (reader_thread_handle);
229                   }
230                 goto fail;
231               }
232             /* Tell the child there is nothing more the parent will send.  */
233             close (fd[1]);
234             writer_cleaned_up = true;
235           }
236         if (l.reader_terminated)
237           {
238             /* The reader thread has just terminated.  */
239             l.reader_terminated = false;
240             CloseHandle (reader_thread_handle);
241             if (l.reader_errno)
242               {
243                 if (exit_on_error)
244                   error (EXIT_FAILURE, l.reader_errno,
245                          _("read from %s subprocess failed"), progname);
246                 if (!writer_cleaned_up)
247                   {
248                     TerminateThread (writer_thread_handle, 1);
249                     CloseHandle (writer_thread_handle);
250                   }
251                 goto fail;
252               }
253             reader_cleaned_up = true;
254           }
255         if (writer_cleaned_up && reader_cleaned_up)
256           break;
257       }
258   }
259 #else
260   /* When we write to the child process and it has just terminated,
261      we don't want to die from a SIGPIPE signal.  So set the SIGPIPE
262      handler to SIG_IGN, and handle EPIPE error codes in write().  */
263   {
264     struct sigaction sigpipe_action;
265
266     sigpipe_action.sa_handler = SIG_IGN;
267     sigpipe_action.sa_flags = 0;
268     sigemptyset (&sigpipe_action.sa_mask);
269     if (sigaction (SIGPIPE, &sigpipe_action, &orig_sigpipe_action) < 0)
270       abort ();
271   }
272
273   {
274 # if HAVE_SELECT
275     fd_set readfds;  /* All bits except fd[0] are always cleared.  */
276     fd_set writefds; /* All bits except fd[1] are always cleared.  */
277 # endif
278     bool done_writing;
279
280     /* Enable non-blocking I/O.  This permits the read() and write() calls
281        to return -1/EAGAIN without blocking; this is important for polling
282        if HAVE_SELECT is not defined.  It also permits the read() and write()
283        calls to return after partial reads/writes; this is important if
284        HAVE_SELECT is defined, because select() only says that some data
285        can be read or written, not how many.  Without non-blocking I/O,
286        Linux 2.2.17 and BSD systems prefer to block instead of returning
287        with partial results.  */
288     {
289       int fcntl_flags;
290
291       if ((fcntl_flags = fcntl (fd[1], F_GETFL, 0)) < 0
292           || fcntl (fd[1], F_SETFL, fcntl_flags | O_NONBLOCK) == -1
293           || (fcntl_flags = fcntl (fd[0], F_GETFL, 0)) < 0
294           || fcntl (fd[0], F_SETFL, fcntl_flags | O_NONBLOCK) == -1)
295         {
296           if (exit_on_error)
297             error (EXIT_FAILURE, errno,
298                    _("cannot set up nonblocking I/O to %s subprocess"),
299                    progname);
300           goto fail;
301         }
302     }
303
304 # if HAVE_SELECT
305     FD_ZERO (&readfds);
306     FD_ZERO (&writefds);
307 # endif
308     done_writing = false;
309     for (;;)
310       {
311 # if HAVE_SELECT
312         int n;
313
314         FD_SET (fd[0], &readfds);
315         n = fd[0] + 1;
316         if (!done_writing)
317           {
318             FD_SET (fd[1], &writefds);
319             if (n <= fd[1])
320               n = fd[1] + 1;
321           }
322
323         n = select (n, &readfds, (!done_writing ? &writefds : NULL), NULL,
324                     NULL);
325         if (n < 0)
326           {
327             if (exit_on_error)
328               error (EXIT_FAILURE, errno,
329                      _("communication with %s subprocess failed"), progname);
330             goto fail;
331           }
332         if (!done_writing && FD_ISSET (fd[1], &writefds))
333           goto try_write;
334         if (FD_ISSET (fd[0], &readfds))
335           goto try_read;
336         /* How could select() return if none of the two descriptors is ready?  */
337         abort ();
338 # endif
339
340         /* Attempt to write.  */
341 # if HAVE_SELECT
342       try_write:
343 # endif
344         if (!done_writing)
345           {
346             size_t bufsize;
347             const void *buf = prepare_write (&bufsize, private_data);
348             if (buf != NULL)
349               {
350                 /* Writing to a pipe in non-blocking mode is tricky: The
351                    write() call may fail with EAGAIN, simply because sufficient
352                    space is not available in the pipe. See POSIX:2008
353                    <http://pubs.opengroup.org/onlinepubs/9699919799/functions/write.html>.
354                    This happens actually on AIX and IRIX, when bufsize >= 8192
355                    (even though PIPE_BUF and pathconf ("/", _PC_PIPE_BUF) are
356                    both 32768).  */
357                 size_t attempt_to_write =
358                   (bufsize > SSIZE_MAX ? SSIZE_MAX : bufsize);
359                 for (;;)
360                   {
361                     ssize_t nwritten = write (fd[1], buf, attempt_to_write);
362                     if (nwritten < 0)
363                       {
364                         if (errno == EAGAIN)
365                           {
366                             attempt_to_write = attempt_to_write / 2;
367                             if (attempt_to_write == 0)
368                               break;
369                           }
370                         else if (!IS_EAGAIN (errno))
371                           {
372                             if (exit_on_error)
373                               error (EXIT_FAILURE, errno,
374                                      _("write to %s subprocess failed"),
375                                      progname);
376                             goto fail;
377                           }
378                       }
379                     else
380                       {
381                         if (nwritten > 0)
382                           done_write ((void *) buf, nwritten, private_data);
383                         break;
384                       }
385                   }
386               }
387             else
388               {
389                 /* Tell the child there is nothing more the parent will send.  */
390                 close (fd[1]);
391                 done_writing = true;
392               }
393           }
394 # if HAVE_SELECT
395         continue;
396 # endif
397
398         /* Attempt to read.  */
399 # if HAVE_SELECT
400       try_read:
401 # endif
402         {
403           size_t bufsize;
404           void *buf = prepare_read (&bufsize, private_data);
405           if (!(buf != NULL && bufsize > 0))
406             /* prepare_read returned wrong values.  */
407             abort ();
408           {
409             ssize_t nread =
410               read (fd[0], buf, bufsize > SSIZE_MAX ? SSIZE_MAX : bufsize);
411             if (nread < 0)
412               {
413                 if (!IS_EAGAIN (errno))
414                   {
415                     if (exit_on_error)
416                       error (EXIT_FAILURE, errno,
417                              _("read from %s subprocess failed"), progname);
418                     goto fail;
419                   }
420               }
421             else if (nread > 0)
422               done_read (buf, nread, private_data);
423             else /* nread == 0 */
424               {
425                 if (done_writing)
426                   break;
427               }
428           }
429         }
430 # if HAVE_SELECT
431         continue;
432 # endif
433       }
434   }
435
436   /* Restore SIGPIPE signal handler.  */
437   if (sigaction (SIGPIPE, &orig_sigpipe_action, NULL) < 0)
438     abort ();
439 #endif
440
441   close (fd[0]);
442
443   /* Remove zombie process from process list.  */
444   {
445     int exitstatus =
446       wait_subprocess (child, progname, false, null_stderr,
447                        true, exit_on_error, NULL);
448     if (exitstatus != 0 && exit_on_error)
449       error (EXIT_FAILURE, 0, _("%s subprocess terminated with exit code %d"),
450              progname, exitstatus);
451     return exitstatus;
452   }
453
454  fail:
455   {
456     int saved_errno = errno;
457     close (fd[1]);
458 #if !((defined _WIN32 || defined __WIN32__) && ! defined __CYGWIN__)
459     if (sigaction (SIGPIPE, &orig_sigpipe_action, NULL) < 0)
460       abort ();
461 #endif
462     close (fd[0]);
463     wait_subprocess (child, progname, true, true, true, false, NULL);
464     errno = saved_errno;
465     return -1;
466   }
467 }