maint: update copyright
[gnulib.git] / lib / pipe-filter-gi.c
1 /* Filtering of data through a subprocess.
2    Copyright (C) 2001-2003, 2008-2014 Free Software Foundation, Inc.
3    Written by Paolo Bonzini <bonzini@gnu.org>, 2009,
4    and Bruno Haible <bruno@clisp.org>, 2009.
5
6    This program is free software: you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program.  If not, see <http://www.gnu.org/licenses/>.  */
18
19 #include <config.h>
20
21 #include "pipe-filter.h"
22
23 #include <errno.h>
24 #include <fcntl.h>
25 #include <stdbool.h>
26 #include <stdint.h>
27 #include <stdlib.h>
28 #include <unistd.h>
29 #if (defined _WIN32 || defined __WIN32__) && ! defined __CYGWIN__
30 # include <windows.h>
31 #else
32 # include <signal.h>
33 # include <sys/select.h>
34 #endif
35
36 #include "error.h"
37 #include "spawn-pipe.h"
38 #include "wait-process.h"
39 #include "xalloc.h"
40 #include "gettext.h"
41
42 #define _(str) gettext (str)
43
44 #include "pipe-filter-aux.h"
45
46 struct pipe_filter_gi
47 {
48   /* Arguments passed to pipe_filter_gi_create.  */
49   const char *progname;
50   bool null_stderr;
51   bool exit_on_error;
52   prepare_read_fn prepare_read;
53   done_read_fn done_read;
54   void *private_data;
55
56   /* Management of the subprocess.  */
57   pid_t child;
58   int fd[2];
59   bool exited;
60   int exitstatus;
61
62   /* Status of the writer part.  */
63   volatile bool writer_terminated;
64   int writer_errno;
65   /* Status of the reader part.  */
66   volatile bool reader_terminated;
67   volatile int reader_errno;
68
69 #if (defined _WIN32 || defined __WIN32__) && ! defined __CYGWIN__
70   CRITICAL_SECTION lock; /* protects the volatile fields */
71   HANDLE reader_thread_handle;
72 #else
73   struct sigaction orig_sigpipe_action;
74   fd_set readfds;  /* All bits except fd[0] are always cleared.  */
75   fd_set writefds; /* All bits except fd[1] are always cleared.  */
76 #endif
77 };
78
79
80 /* Platform dependent functions.  */
81
82 /* Perform additional initializations.
83    Return 0 if successful, -1 upon failure.  */
84 static int filter_init (struct pipe_filter_gi *filter);
85
86 /* Write count bytes starting at buf, while at the same time invoking the
87    read iterator (the functions prepare_read/done_read) when needed.  */
88 static void filter_loop (struct pipe_filter_gi *filter,
89                          const char *wbuf, size_t count);
90
91 /* Perform cleanup actions at the end.
92    finish_reading is true if there was no error, or false if some error
93    occurred already.  */
94 static void filter_cleanup (struct pipe_filter_gi *filter,
95                             bool finish_reading);
96
97
98 #if (defined _WIN32 || defined __WIN32__) && ! defined __CYGWIN__
99 /* Native Windows API.  */
100
101 static unsigned int WINAPI
102 reader_thread_func (void *thread_arg)
103 {
104   struct pipe_filter_gi *filter = (struct pipe_filter_gi *) thread_arg;
105
106   for (;;)
107     {
108       size_t bufsize;
109       void *buf = filter->prepare_read (&bufsize, filter->private_data);
110       if (!(buf != NULL && bufsize > 0))
111         /* prepare_read returned wrong values.  */
112         abort ();
113       {
114         ssize_t nread =
115           read (filter->fd[0], buf, bufsize > SSIZE_MAX ? SSIZE_MAX : bufsize);
116         EnterCriticalSection (&filter->lock);
117         /* If the writer already encountered an error, terminate.  */
118         if (filter->writer_terminated)
119           break;
120         if (nread < 0)
121           {
122             filter->reader_errno = errno;
123             break;
124           }
125         else if (nread > 0)
126           filter->done_read (buf, nread, filter->private_data);
127         else /* nread == 0 */
128           break;
129         LeaveCriticalSection (&filter->lock);
130       }
131     }
132
133   filter->reader_terminated = true;
134   LeaveCriticalSection (&filter->lock);
135   _endthreadex (0); /* calls ExitThread (0) */
136   abort ();
137 }
138
139 static int
140 filter_init (struct pipe_filter_gi *filter)
141 {
142   InitializeCriticalSection (&filter->lock);
143   EnterCriticalSection (&filter->lock);
144
145   filter->reader_thread_handle =
146     (HANDLE) _beginthreadex (NULL, 100000, reader_thread_func, filter,
147                              0, NULL);
148
149   if (filter->reader_thread_handle == NULL)
150     {
151       if (filter->exit_on_error)
152         error (EXIT_FAILURE, 0, _("creation of reading thread failed"));
153       return -1;
154     }
155   else
156     return 0;
157 }
158
159 static void
160 filter_loop (struct pipe_filter_gi *filter, const char *wbuf, size_t count)
161 {
162   if (!filter->writer_terminated)
163     {
164       for (;;)
165         {
166           ssize_t nwritten;
167
168           /* Allow the reader thread to continue.  */
169           LeaveCriticalSection (&filter->lock);
170
171           nwritten =
172             write (filter->fd[1], wbuf, count > SSIZE_MAX ? SSIZE_MAX : count);
173
174           /* Get the lock back from the reader thread.  */
175           EnterCriticalSection (&filter->lock);
176
177           if (nwritten < 0)
178             {
179               /* Don't assume that the gnulib modules 'write' and 'sigpipe' are
180                  used.  */
181               if (GetLastError () == ERROR_NO_DATA)
182                 errno = EPIPE;
183               filter->writer_errno = errno;
184               filter->writer_terminated = true;
185               break;
186             }
187           else if (nwritten > 0)
188             {
189               count -= nwritten;
190               if (count == 0)
191                 break;
192               wbuf += nwritten;
193             }
194           else /* nwritten == 0 */
195             {
196               filter->writer_terminated = true;
197               break;
198             }
199         }
200     }
201 }
202
203 static void
204 filter_cleanup (struct pipe_filter_gi *filter, bool finish_reading)
205 {
206   if (finish_reading)
207     {
208       LeaveCriticalSection (&filter->lock);
209       WaitForSingleObject (filter->reader_thread_handle, INFINITE);
210     }
211   else
212     TerminateThread (filter->reader_thread_handle, 1);
213
214   CloseHandle (filter->reader_thread_handle);
215   DeleteCriticalSection (&filter->lock);
216 }
217
218 #else
219 /* Unix API.  */
220
221 static int
222 filter_init (struct pipe_filter_gi *filter)
223 {
224 #if !((defined _WIN32 || defined __WIN32__) && ! defined __CYGWIN__)
225   /* When we write to the child process and it has just terminated,
226      we don't want to die from a SIGPIPE signal.  So set the SIGPIPE
227      handler to SIG_IGN, and handle EPIPE error codes in write().  */
228   {
229     struct sigaction sigpipe_action;
230
231     sigpipe_action.sa_handler = SIG_IGN;
232     sigpipe_action.sa_flags = 0;
233     sigemptyset (&sigpipe_action.sa_mask);
234     if (sigaction (SIGPIPE, &sigpipe_action, &filter->orig_sigpipe_action) < 0)
235       abort ();
236   }
237 #endif
238
239   /* Enable non-blocking I/O.  This permits the read() and write() calls
240      to return -1/EAGAIN without blocking; this is important for polling
241      if HAVE_SELECT is not defined.  It also permits the read() and write()
242      calls to return after partial reads/writes; this is important if
243      HAVE_SELECT is defined, because select() only says that some data
244      can be read or written, not how many.  Without non-blocking I/O,
245      Linux 2.2.17 and BSD systems prefer to block instead of returning
246      with partial results.  */
247   {
248     int fcntl_flags;
249
250     if ((fcntl_flags = fcntl (filter->fd[1], F_GETFL, 0)) < 0
251         || fcntl (filter->fd[1], F_SETFL, fcntl_flags | O_NONBLOCK) == -1
252         || (fcntl_flags = fcntl (filter->fd[0], F_GETFL, 0)) < 0
253         || fcntl (filter->fd[0], F_SETFL, fcntl_flags | O_NONBLOCK) == -1)
254       {
255         if (filter->exit_on_error)
256           error (EXIT_FAILURE, errno,
257                  _("cannot set up nonblocking I/O to %s subprocess"),
258                  filter->progname);
259         return -1;
260       }
261   }
262
263   FD_ZERO (&filter->readfds);
264   FD_ZERO (&filter->writefds);
265
266   return 0;
267 }
268
269 static void
270 filter_loop (struct pipe_filter_gi *filter, const char *wbuf, size_t count)
271 {
272   /* This function is used in two situations:
273      - in order to write some data to the subprocess
274        [done_writing = false],
275      - in order to read the remaining data after everything was written
276        [done_writing = true].  In this case buf is NULL and count is
277        ignored.  */
278   bool done_writing = (wbuf == NULL);
279
280   if (!done_writing)
281     {
282       if (filter->writer_terminated || filter->reader_terminated)
283         /* pipe_filter_gi_write was called when it should not be.  */
284         abort ();
285     }
286   else
287     {
288       if (filter->reader_terminated)
289         return;
290     }
291
292   /* Loop, trying to write the given buffer or reading, whichever is
293      possible.  */
294   for (;;)
295     {
296       /* Here filter->writer_terminated is false.  When it becomes true, this
297          loop is terminated.  */
298       /* Whereas filter->reader_terminated is initially false but may become
299          true during this loop.  */
300       /* Here, if !done_writing, count > 0.  When count becomes 0, this loop
301          is terminated.  */
302       /* Here, if done_writing, filter->reader_terminated is false.  When
303          filter->reader_terminated becomes true, this loop is terminated.  */
304 # if HAVE_SELECT
305       int n;
306
307       /* See whether reading or writing is possible.  */
308       n = 1;
309       if (!filter->reader_terminated)
310         {
311           FD_SET (filter->fd[0], &filter->readfds);
312           n = filter->fd[0] + 1;
313         }
314       if (!done_writing)
315         {
316           FD_SET (filter->fd[1], &filter->writefds);
317           if (n <= filter->fd[1])
318             n = filter->fd[1] + 1;
319         }
320       n = select (n,
321                   (!filter->reader_terminated ? &filter->readfds : NULL),
322                   (!done_writing ? &filter->writefds : NULL),
323                   NULL, NULL);
324
325       if (n < 0)
326         {
327           if (filter->exit_on_error)
328             error (EXIT_FAILURE, errno,
329                    _("communication with %s subprocess failed"),
330                    filter->progname);
331           filter->writer_errno = errno;
332           filter->writer_terminated = true;
333           break;
334         }
335
336       if (!done_writing && FD_ISSET (filter->fd[1], &filter->writefds))
337         goto try_write;
338       if (!filter->reader_terminated
339           && FD_ISSET (filter->fd[0], &filter->readfds))
340         goto try_read;
341       /* How could select() return if none of the two descriptors is ready?  */
342       abort ();
343 # endif
344
345       /* Attempt to write.  */
346 # if HAVE_SELECT
347     try_write:
348 # endif
349       if (!done_writing)
350         {
351           ssize_t nwritten =
352             write (filter->fd[1], wbuf, count > SSIZE_MAX ? SSIZE_MAX : count);
353           if (nwritten < 0)
354             {
355               if (!IS_EAGAIN (errno))
356                 {
357                   if (filter->exit_on_error)
358                     error (EXIT_FAILURE, errno,
359                            _("write to %s subprocess failed"),
360                            filter->progname);
361                   filter->writer_errno = errno;
362                   filter->writer_terminated = true;
363                   break;
364                 }
365             }
366           else if (nwritten > 0)
367             {
368               count -= nwritten;
369               if (count == 0)
370                 break;
371               wbuf += nwritten;
372             }
373         }
374 # if HAVE_SELECT
375       continue;
376 # endif
377
378       /* Attempt to read.  */
379 # if HAVE_SELECT
380     try_read:
381 # endif
382       if (!filter->reader_terminated)
383         {
384           size_t bufsize;
385           void *buf = filter->prepare_read (&bufsize, filter->private_data);
386           if (!(buf != NULL && bufsize > 0))
387             /* prepare_read returned wrong values.  */
388             abort ();
389           {
390             ssize_t nread =
391               read (filter->fd[0], buf,
392                     bufsize > SSIZE_MAX ? SSIZE_MAX : bufsize);
393             if (nread < 0)
394               {
395                 if (!IS_EAGAIN (errno))
396                   {
397                     if (filter->exit_on_error)
398                       error (EXIT_FAILURE, errno,
399                              _("read from %s subprocess failed"),
400                              filter->progname);
401                     filter->reader_errno = errno;
402                     filter->reader_terminated = true;
403                     break;
404                   }
405               }
406             else if (nread > 0)
407               filter->done_read (buf, nread, filter->private_data);
408             else /* nread == 0 */
409               {
410                 filter->reader_terminated = true;
411                 if (done_writing)
412                   break;
413               }
414           }
415       }
416 # if HAVE_SELECT
417       continue;
418 # endif
419     }
420 }
421
422 static void
423 filter_cleanup (struct pipe_filter_gi *filter, bool finish_reading)
424 {
425   if (finish_reading)
426     /* A select loop, with done_writing = true.  */
427     filter_loop (filter, NULL, 0);
428
429   if (sigaction (SIGPIPE, &filter->orig_sigpipe_action, NULL) < 0)
430     abort ();
431 }
432
433 #endif
434
435
436 /* Terminate the child process.  Do nothing if it already exited.  */
437 static void
438 filter_terminate (struct pipe_filter_gi *filter)
439 {
440   if (!filter->exited)
441     {
442       /* Tell the child there is nothing more the parent will send.  */
443       close (filter->fd[1]);
444       filter_cleanup (filter, !filter->reader_terminated);
445       close (filter->fd[0]);
446       filter->exitstatus =
447         wait_subprocess (filter->child, filter->progname, true,
448                          filter->null_stderr, true, filter->exit_on_error,
449                          NULL);
450       if (filter->exitstatus != 0 && filter->exit_on_error)
451         error (EXIT_FAILURE, 0,
452                _("subprocess %s terminated with exit code %d"),
453                filter->progname, filter->exitstatus);
454       filter->exited = true;
455     }
456 }
457
458 /* After filter_terminate:
459    Return 0 upon success, or (only if exit_on_error is false):
460    - -1 with errno set upon failure,
461    - the positive exit code of the subprocess if that failed.  */
462 static int
463 filter_retcode (struct pipe_filter_gi *filter)
464 {
465   if (filter->writer_errno != 0)
466     {
467       errno = filter->writer_errno;
468       return -1;
469     }
470   else if (filter->reader_errno != 0)
471     {
472       errno = filter->reader_errno;
473       return -1;
474     }
475   else
476     return filter->exitstatus;
477 }
478
479 struct pipe_filter_gi *
480 pipe_filter_gi_create (const char *progname,
481                        const char *prog_path, const char **prog_argv,
482                        bool null_stderr, bool exit_on_error,
483                        prepare_read_fn prepare_read,
484                        done_read_fn done_read,
485                        void *private_data)
486 {
487   struct pipe_filter_gi *filter;
488
489   filter =
490     (struct pipe_filter_gi *) xmalloc (sizeof (struct pipe_filter_gi));
491
492   /* Open a bidirectional pipe to a subprocess.  */
493   filter->child = create_pipe_bidi (progname, prog_path, (char **) prog_argv,
494                                     null_stderr, true, exit_on_error,
495                                     filter->fd);
496   filter->progname = progname;
497   filter->null_stderr = null_stderr;
498   filter->exit_on_error = exit_on_error;
499   filter->prepare_read = prepare_read;
500   filter->done_read = done_read;
501   filter->private_data = private_data;
502   filter->exited = false;
503   filter->exitstatus = 0;
504   filter->writer_terminated = false;
505   filter->writer_errno = 0;
506   filter->reader_terminated = false;
507   filter->reader_errno = 0;
508
509   if (filter->child == -1)
510     {
511       /* Child process could not be created.
512          Arrange for filter_retcode (filter) to be the current errno.  */
513       filter->writer_errno = errno;
514       filter->writer_terminated = true;
515       filter->exited = true;
516     }
517   else if (filter_init (filter) < 0)
518     filter_terminate (filter);
519
520   return filter;
521 }
522
523 int
524 pipe_filter_gi_write (struct pipe_filter_gi *filter,
525                       const void *buf, size_t size)
526 {
527   if (buf == NULL)
528     /* Invalid argument.  */
529     abort ();
530
531   if (filter->exited)
532     return filter_retcode (filter);
533
534   if (size > 0)
535     {
536       filter_loop (filter, buf, size);
537       if (filter->writer_terminated || filter->reader_terminated)
538         {
539           filter_terminate (filter);
540           return filter_retcode (filter);
541         }
542     }
543   return 0;
544 }
545
546 int
547 pipe_filter_gi_close (struct pipe_filter_gi *filter)
548 {
549   int ret;
550   int saved_errno;
551
552   filter_terminate (filter);
553   ret = filter_retcode (filter);
554   saved_errno = errno;
555   free (filter);
556   errno = saved_errno;
557   return ret;
558 }