Hash :
60192b46
Author :
Date :
2006-02-26T20:18:35
improved/well-completely rewritten rtsig support by Mathew Mills; fix some cases where regress would not pass on Linux svn:r204
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985
/*
* Copyright (c) 2006 Mathew Mills <mathewmills@mac.com>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* 3. The name of the author may not be used to endorse or promote products
* derived from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
* IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
* IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
* NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
* THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
/*
* Meta-level comments: You know that a kernel interface is wrong if
* supporting it requires three times more code than any of the other
* kernel interfaces supported in libevent. Niels - 2006-02-22
*/
/**
"RTSIG" is a shorthand for using O_ASYNC to make descriptors send
signals when readable/writable and to use POSIX real-time signals
witch are queued unlike normal signals. At first blush this may
seem like a alternative to epoll, but a number of problems arise
when attempting to build an eventloop entirely out of rtsig.
Still, we can use rtsig in combination with poll() to
provide an eventloop that allows for many thousands of sockets
without huge overheads implicit with using select() or poll()
alone. epoll and kqueue are far superior to rtsig and should be
used where available, but rtsig has been in standard Linux kernels
for a long time and have a huge installation base. epoll requires
special patches for 2.4 kernels and 2.6 kernels are not yet nearly
so ubiquitous.
rtsig problems:
- O_ASYNC mechanisms work only on sockets - not pipes or tty's
- O_ASYNC signals are edge-triggered, POLLIN on packet arriving
or socket close; POLLOUT when a socket transitions from
non-writable to writable. Being edge-triggered means the
event-handler callbacks must transition the level ( reading
completely the socket buffer contents ) or it will be unable to
reliably receive notification again.
- rtsig implementations must be intimately involved in how a
process dispatches signals.
- delivering signals per-event can be expensive, CPU-wise, but
sigtimedwait() blocks on signals only and means non-sockets
cannot be serviced.
Theory of operation:
This libevent module uses rtsig to allow us to manage a set of
poll-event descriptors. We can drop uninteresting fd's from the
pollset if the fd will send a signal when it becomes interesting
again.
poll() offers us level-triggering and, when we have verified the
level of a socket, we can trust the edge-trigger nature of the
ASYNC signal.
As an eventloop we must poll for external events but leverage
kernel functionality to sleep between events ( until the loop's
next scheduled timed event ).
If we are polling on any non-sockets then we simply have no choice
about blocking on the poll() call. If we blocked on the
sigtimedwait() call as rtsig papers recommend we will not wake on
non-socket state transitions. As part of libevent, this module
must support non-socket polling.
Many applications, however, do not need to poll on non-sockets and
so this module should be able to optimize this case by using
sigtimedwait(). For this reason this module can actually trigger
events in each of three different ways:
- poll() returning ready events from descriptors in the pollset
- real-time signals dequeued via sigtimedwait()
- real-time signals that call an installed signal handler which in
turn writes the contents of siginfo to one end of a socketpair
DGRAM socket. The other end of the socket is always in the
pollset so poll will be guaranteed to return even if the signal is
received before entering poll().
non-socket descriptors force us to block on the poll() for the
duration of a dispatch. In this case we unblock (w/ sigprocmask)
the managed signals just before polling. Each managed signal is
handled by signal_handler() which send()'s the contents of siginfo
over the socketpair. Otherwise, we call poll() with a timeout of
0ms so it checks the levels of the fd's in the pollset and returns
immediately. Any fd that is a socket and has no active state is
removed from the pollset for the next pass -- we will rely on
getting a signal for events on these fd's.
The receiving end of the siginfo socketpair is in the pollset
(permanently) so if we are polling on non-sockets, the delivery of
signals immediately following sigprocmask( SIG_UNBLOCK...) will
result in a readable op->signal_recv_fd which ensures the poll()
will return immediately. If the poll() call is blocking and a
signal arrives ( possibly a real-time signal from a socket not in
the pollset ) its handler will write the data to the socketpair
and interrupt the poll().
After every poll call we attempt a non-blocking recv from the
signal_recv_fd and continue to recv and dispatch the events until
recv indicates the socket buffer is empty.
One might raise concerns about receiving event activations from
both poll() and from the rtsig data in the signal_recv_fd.
Fortunately, libevent is already structured for event coalescing,
so this issue is mitigated ( though we do some work twice for the
same event making us less efficient ). I suspect that the cost of
turning off the O_ASYNC flag on fd's in the pollset is more
expensive than handling some events twice. Looking at the
kernel's source code for setting O_ASYNC, it looks like it takes a
global kernel lock...
After a poll and recv-loop for the signal_recv_fd, we finally do a
sigtimedwait(). sigtimedwait will only block if we haven't
blocked in poll() and we have not enqueued events from either the
poll or the recv-loop. Because sigtimedwait blocks all signals
that are not in the set of signals to be dequeued, we need to
dequeue almost all signals and make sure we dispatch them
correctly. We dequeue any signal that is not blocked as well as
all libevent-managed signals. If we get a signal that is not
managed by libevent we lookup the sigaction for the specific
signal and call that function ourselves.
Finally, I should mention that getting a SIGIO signal indicates
that the rtsig buffer has overflowed and we have lost events.
This forces us to add _every_ descriptor to the pollset to recover.
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
/* Enable F_SETSIG and F_SETOWN */
#define _GNU_SOURCE
#include <sys/types.h>
#ifdef HAVE_SYS_TIME_H
#include <sys/time.h>
#else
#include <sys/_time.h>
#endif
#include <assert.h>
#include <errno.h>
#include <fcntl.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/poll.h>
#include <sys/queue.h>
#include <sys/tree.h>
#include <unistd.h>
#include <sys/socket.h>
#include "event.h"
#include "event-internal.h"
#include "log.h"
extern struct event_list signalqueue;
#include <linux/unistd.h>
#ifndef __NR_gettid
#define gettid() getpid()
#else
#if ((__GLIBC__ > 2) || ((__GLIBC__ == 2) && (__GLIBC_MINOR__ >= 3)))
_syscall0(pid_t,gettid)
#endif
#endif
#define EVLIST_NONSOCK 0x1000 /* event is for a non-socket file-descriptor */
#define EVLIST_DONTDEL 0x2000 /* event should always be in the pollset */
#define MAXBUFFERSIZE (1024 * 1024 * 2) /* max socketbuffer for signal-spair */
#define INIT_MAX 16 /* init/min # of fd positions in our pollset */
static int signal_send_fd[_NSIG]; /* the globalend of the signal socketpair */
static int trouble[_NSIG]; /* 1 when signal-handler cant send to signal_send_fd */
struct rtdata;
TAILQ_HEAD(rtdata_list, rtdata);
struct rtsigop {
sigset_t sigs; /* signal mask for all _managed_ signals */
struct pollfd *poll; /* poll structures */
struct rtdata **ptodat; /* map poll_position to rtdata */
int cur; /* cur # fd's in a poll set */
int max; /* max # fd's in a poll set, start at 16 and grow as needed */
int total; /* count of fd's we are watching now */
int signo; /* the signo we use for ASYNC fd notifications */
int nonsock; /* number of non-socket fd's we are watching */
int highestfd; /* highest fd accomodated by fdtodat */
struct rtdata_list **fdtodat; /* map fd to rtdata ( and thus to event ) */
int signal_recv_fd; /* recv side of the signal_send_fd */
int signal_send_fd; /* recv side of the signal_send_fd */
struct event sigfdev; /* our own event structure for the signal fd */
};
struct rtdata {
/* rtdata holds rtsig-private state on each event */
TAILQ_ENTRY (rtdata) next;
struct event *ev;
int poll_position;
};
void *rtsig_init(void);
int rtsig_add(void *, struct event *);
int rtsig_del(void *, struct event *);
int rtsig_recalc(struct event_base *, void *, int);
int rtsig_dispatch(struct event_base *, void *, struct timeval *);
struct eventop rtsigops = {
"rtsig",
rtsig_init,
rtsig_add,
rtsig_del,
rtsig_recalc,
rtsig_dispatch
};
static void
signal_handler(int sig, siginfo_t *info, void *ctx)
{
/*
* the signal handler for all libevent-managed signals only
* used if we need to do a blocking poll() call due to
* non-socket fd's in the pollset.
*/
siginfo_t *i = info;
siginfo_t i_local;
if (trouble[sig - 1]) {
i_local.si_signo = SIGIO;
i_local.si_errno = 0;
i_local.si_code = 0;
i = &i_local;
trouble[sig - 1] = 0;
}
if (send(signal_send_fd[sig - 1], i, sizeof(*i),
MSG_DONTWAIT|MSG_NOSIGNAL) == -1)
trouble[sig - 1] = 1;
}
static void
donothing(int fd, short event, void *arg)
{
/*
* callback for our signal_recv_fd event structure
* we don't want to act on these events, we just want to wake the poll()
*/
};
static void
signotset(sigset_t *set)
{
int i, l;
l = sizeof(*set) / 4;
for (i = 0; i < l; i++) {
((unsigned *)set)[i] = ~((unsigned *)set)[i];
}
}
/* The next three functions manage our private data about each event struct */
static int
grow_fdset(struct rtsigop *op, int newhigh)
{
/*
* grow the fd -> rtdata array because we have encountered a
* new fd too high to fit in the existing array
*/
struct rtdata_list **p;
struct rtdata_list *datset;
int i,x;
int newcnt = (newhigh + 1) << 1;
if (newhigh <= op->highestfd)
return (0);
p = op->fdtodat;
p = realloc(op->fdtodat, sizeof(struct rtdata_list *) * newcnt);
if (p == NULL)
return (-1);
op->fdtodat = p;
datset = calloc(newcnt - (op->highestfd + 1),
sizeof(struct rtdata_list));
if (datset == NULL)
return (-1);
for (i = op->highestfd + 1, x = 0; i < newcnt; i++, x++) {
op->fdtodat[i] = &(datset[x]);
TAILQ_INIT(op->fdtodat[i]);
}
op->highestfd = newcnt - 1;
return (0);
}
static struct rtdata *
ev2dat(struct rtsigop *op, struct event *ev, int create)
{
/*
* given an event struct, find the dat structure that
* corresponds to it if create is non-zero and the rtdata
* structure does not exist, create it return NULL if not
* found
*/
int found = 0;
int fd = ev->ev_fd;
struct rtdata *ret = NULL;
if (op->highestfd < fd && create)
if (grow_fdset(op, fd) == -1)
return (NULL);
TAILQ_FOREACH(ret, op->fdtodat[fd], next) {
if (ret->ev == ev) {
found = 1;
break;
}
}
if (!found) {
if (!create)
return (NULL);
ret = calloc(1, sizeof(struct rtdata));
if (ret == NULL)
return (NULL);
ret->ev = ev;
ret->poll_position = -1;
TAILQ_INSERT_TAIL(op->fdtodat[fd], ret, next);
}
return (ret);
}
static void
dat_del(struct rtsigop *op, struct rtdata *dat)
{
/*
* delete our private notes about a given event struct
* called from rtsig_del() only
*/
int fd;
if (dat == NULL)
return;
fd = dat->ev->ev_fd;
TAILQ_REMOVE(op->fdtodat[fd], dat, next);
memset(dat, 0, sizeof(*dat));
free(dat);
}
static void
set_sigaction(int sig)
{
/*
* set the standard handler for any libevent-managed signal,
* including the rtsig used for O_ASYNC notifications
*/
struct sigaction act;
act.sa_flags = SA_RESTART | SA_SIGINFO;
sigfillset(&(act.sa_mask));
act.sa_sigaction = &signal_handler;
sigaction(sig, &act, NULL);
}
static int
find_rt_signal()
{
/* find an unused rtsignal */
struct sigaction act;
int sig = SIGRTMIN;
while (sig <= SIGRTMAX) {
if (sigaction(sig, NULL, &act) != 0) {
if (errno == EINTR)
continue;
} else {
if (act.sa_flags & SA_SIGINFO) {
if (act.sa_sigaction == NULL)
return (sig);
} else {
if (act.sa_handler == SIG_DFL)
return (sig);
}
}
sig++;
}
return (0);
}
/*
* the next three functions manage our pollset and the memory management for
* fd -> rtdata -> event -> poll_position maps
*/
static int
poll_add(struct rtsigop *op, struct event *ev, struct rtdata *dat)
{
struct pollfd *pfd;
int newmax = op->max << 1;
int pp;
if (op->poll == NULL)
return (0);
if (dat == NULL)
dat = ev2dat(op, ev, 0);
if (dat == NULL)
return (0);
pp = dat->poll_position;
if (pp != -1) {
pfd = &op->poll[pp];
if (ev->ev_events & EV_READ)
pfd->events |= POLLIN;
if (ev->ev_events & EV_WRITE)
pfd->events |= POLLOUT;
return (0);
}
if (op->cur == op->max) {
void *p = realloc(op->poll, sizeof(*op->poll) * newmax);
if (p == NULL) {
errno = ENOMEM;
return (-1);
}
op->poll = p;
p = realloc(op->ptodat, sizeof(*op->ptodat) * newmax);
if (p == NULL) {
/* shrink the pollset back down */
op->poll = realloc(op->poll,
sizeof(*op->poll) * op->max);
errno = ENOMEM;
return (-1);
}
op->ptodat = p;
op->max = newmax;
}
pfd = &op->poll[op->cur];
pfd->fd = ev->ev_fd;
pfd->revents = 0;
pfd->events = 0;
if (ev->ev_events & EV_READ)
pfd->events |= POLLIN;
if (ev->ev_events & EV_WRITE)
pfd->events |= POLLOUT;
op->ptodat[op->cur] = dat;
dat->poll_position = op->cur;
op->cur++;
return (0);
}
static void
poll_free(struct rtsigop *op, int n)
{
if (op->poll == NULL)
return;
op->cur--;
if (n < op->cur) {
memcpy(&op->poll[n], &op->poll[op->cur], sizeof(*op->poll));
op->ptodat[n] = op->ptodat[op->cur];
op->ptodat[n]->poll_position = n;
}
/* less then half the max in use causes us to shrink */
if (op->max > INIT_MAX && op->cur < op->max >> 1) {
op->max >>= 1;
op->poll = realloc(op->poll, sizeof(*op->poll) * op->max);
op->ptodat = realloc(op->ptodat, sizeof(*op->ptodat) * op->max);
}
}
static void
poll_remove(struct rtsigop *op, struct event *ev, struct rtdata *dat)
{
int pp;
if (dat == NULL)
dat = ev2dat(op, ev, 0);
if (dat == NULL) return;
pp = dat->poll_position;
if (pp != -1) {
poll_free(op, pp);
dat->poll_position = -1;
}
}
static void
activate(struct event *ev, int flags)
{
/* activate an event, possibly removing one-shot events */
if (!(ev->ev_events & EV_PERSIST))
event_del(ev);
event_active(ev, flags, 1);
}
#define FD_CLOSEONEXEC(x) do { \
if (fcntl(x, F_SETFD, 1) == -1) \
event_warn("fcntl(%d, F_SETFD)", x); \
} while (0)
void *
rtsig_init(void)
{
struct rtsigop *op;
int sockets[2];
int optarg;
struct rtdata *dat;
int flags;
if (getenv("EVENT_NORTSIG"))
goto err;
op = calloc(1, sizeof(*op));
if (op == NULL)
goto err;
op->max = INIT_MAX;
op->poll = malloc(sizeof(*op->poll) * op->max);
if (op->poll == NULL)
goto err_free_op;
op->signo = find_rt_signal();
if (op->signo == 0)
goto err_free_poll;
op->nonsock = 0;
if (socketpair(PF_UNIX, SOCK_DGRAM, 0, sockets) != 0)
goto err_free_poll;
FD_CLOSEONEXEC(sockets[0]);
FD_CLOSEONEXEC(sockets[1]);
signal_send_fd[op->signo - 1] = sockets[0];
trouble[op->signo - 1] = 0;
op->signal_send_fd = sockets[0];
op->signal_recv_fd = sockets[1];
flags = fcntl(op->signal_recv_fd, F_GETFL);
fcntl(op->signal_recv_fd, F_SETFL, flags | O_NONBLOCK);
optarg = MAXBUFFERSIZE;
setsockopt(signal_send_fd[op->signo - 1],
SOL_SOCKET, SO_SNDBUF,
&optarg, sizeof(optarg));
optarg = MAXBUFFERSIZE;
setsockopt(op->signal_recv_fd,
SOL_SOCKET, SO_RCVBUF,
&optarg, sizeof(optarg));
op->highestfd = -1;
op->fdtodat = NULL;
if (grow_fdset(op, 1) == -1)
goto err_close_pair;
op->ptodat = malloc(sizeof(*op->ptodat) * op->max);
if (op->ptodat == NULL)
goto err_close_pair;
sigemptyset(&op->sigs);
sigaddset(&op->sigs, SIGIO);
sigaddset(&op->sigs, op->signo);
sigprocmask(SIG_BLOCK, &op->sigs, NULL);
set_sigaction(SIGIO);
set_sigaction(op->signo);
event_set(&(op->sigfdev), op->signal_recv_fd, EV_READ|EV_PERSIST,
donothing, NULL);
op->sigfdev.ev_flags |= EVLIST_DONTDEL;
dat = ev2dat(op, &(op->sigfdev), 1);
poll_add(op, &(op->sigfdev), dat);
return (op);
err_close_pair:
close(op->signal_recv_fd);
close(signal_send_fd[op->signo - 1]);
err_free_poll:
free(op->poll);
err_free_op:
free(op);
err:
return (NULL);
}
int
rtsig_add(void *arg, struct event *ev)
{
struct rtsigop *op = (struct rtsigop *) arg;
int flags, i;
struct stat statbuf;
struct rtdata *dat;
if (ev->ev_events & EV_SIGNAL) {
int signo = EVENT_SIGNAL(ev);
sigaddset(&op->sigs, EVENT_SIGNAL(ev));
if (sigprocmask(SIG_BLOCK, &op->sigs, NULL) == -1)
return (-1);
set_sigaction(signo);
signal_send_fd[signo - 1] = op->signal_send_fd;
trouble[signo - 1] = 0;
return (0);
}
if (!(ev->ev_events & (EV_READ|EV_WRITE)))
return (0);
if (-1 == fstat(ev->ev_fd, &statbuf))
return (-1);
if (!S_ISSOCK(statbuf.st_mode))
ev->ev_flags |= EVLIST_NONSOCK;
flags = fcntl(ev->ev_fd, F_GETFL);
if (flags == -1)
return (-1);
if (!(flags & O_ASYNC)) {
if (fcntl(ev->ev_fd, F_SETSIG, op->signo) == -1 ||
fcntl(ev->ev_fd, F_SETOWN, (int) gettid()) == -1)
return (-1);
/*
* the overhead of always handling writeable edges
* isn't going to be that bad...
*/
if (fcntl(ev->ev_fd, F_SETFL, flags | O_ASYNC|O_RDWR))
return (-1);
}
#ifdef O_ONESIGFD
/*
* F_SETAUXFL and O_ONESIGFD are defined in a non-standard
* linux kernel patch to coalesce events for fds
*/
fcntl(ev->ev_fd, F_SETAUXFL, O_ONESIGFD);
#endif
dat = ev2dat(op, ev, 1);
if (dat == NULL)
return (-1);
op->total++;
if (ev->ev_flags & EVLIST_NONSOCK)
op->nonsock++;
if (poll_add(op, ev, dat) == -1) {
/* must check the level of new fd's */
i = errno;
fcntl(ev->ev_fd, F_SETFL, flags);
errno = i;
return (-1);
}
return (0);
}
int
rtsig_del(void *arg, struct event *ev)
{
struct rtdata *dat;
struct rtsigop *op = (struct rtsigop *) arg;
if (ev->ev_events & EV_SIGNAL) {
sigset_t sigs;
sigdelset(&op->sigs, EVENT_SIGNAL(ev));
sigemptyset(&sigs);
sigaddset(&sigs, EVENT_SIGNAL(ev));
return (sigprocmask(SIG_UNBLOCK, &sigs, NULL));
}
if (!(ev->ev_events & (EV_READ|EV_WRITE)))
return (0);
dat = ev2dat(op, ev, 0);
poll_remove(op, ev, dat);
dat_del(op, dat);
op->total--;
if (ev->ev_flags & EVLIST_NONSOCK)
op->nonsock--;
return (0);
}
int
rtsig_recalc(struct event_base *base, void *arg, int max)
{
return (0);
}
/*
* the following do_X functions implement the different stages of a single
* eventloop pass: poll(), recv(sigsock), sigtimedwait()
*
* do_siginfo_dispatch() is a common factor to both do_sigwait() and
* do_signals_from_socket().
*/
static inline int
do_poll(struct rtsigop *op, struct timespec *ts)
{
int res = 0;
int i = 0;
if (op->cur > 1) {
/* non-empty poll set (modulo the signalfd) */
if (op->nonsock) {
int timeout = ts->tv_nsec / 1000000 + ts->tv_sec * 1000;
sigprocmask(SIG_UNBLOCK, &(op->sigs), NULL);
res = poll(op->poll, op->cur, timeout);
sigprocmask(SIG_BLOCK, &(op->sigs), NULL);
ts->tv_sec = 0;
ts->tv_nsec = 0;
} else {
res = poll(op->poll, op->cur, 0);
}
if (res < 0) {
return (errno == EINTR ? 0 : -1);
} else if (res) {
ts->tv_sec = 0;
ts->tv_nsec = 0;
}
i = 0;
while (i < op->cur) {
struct rtdata *dat = op->ptodat[i];
struct event *ev = dat->ev;
if (op->poll[i].revents) {
int flags = 0;
if (op->poll[i].revents & (POLLIN | POLLERR))
flags |= EV_READ;
if (op->poll[i].revents & POLLOUT)
flags |= EV_WRITE;
if (!(ev->ev_events & EV_PERSIST)) {
poll_remove(op, ev, op->ptodat[i]);
event_del(ev);
} else {
i++;
}
event_active(ev, flags, 1);
} else {
if (ev->ev_flags & (EVLIST_NONSOCK|EVLIST_DONTDEL)) {
i++;
} else {
poll_remove(op, ev, op->ptodat[i]);
}
}
}
}
return (res);
}
static inline int
do_siginfo_dispatch(struct event_base *base, struct rtsigop *op,
siginfo_t *info)
{
int signum;
struct rtdata *dat, *next_dat;
struct event *ev, *next_ev;
if (info == NULL)
return (-1);
signum = info->si_signo;
if (signum == op->signo) {
int flags, sigok = 0;
flags = 0;
if (info->si_band & (POLLIN|POLLERR))
flags |= EV_READ;
if (info->si_band & POLLOUT)
flags |= EV_WRITE;
if (!flags)
return (0);
if (info->si_fd > op->highestfd)
return (-1);
dat = TAILQ_FIRST(op->fdtodat[info->si_fd]);
while (dat != TAILQ_END(op->fdtodat[info->si_fd])) {
next_dat = TAILQ_NEXT(dat, next);
if (flags & dat->ev->ev_events) {
ev = dat->ev;
poll_add(op, ev, dat);
activate(ev, flags & ev->ev_events);
sigok = 1;
}
dat = next_dat;
}
} else if (signum == SIGIO) {
TAILQ_FOREACH(ev, &base->eventqueue, ev_next) {
if (ev->ev_events & (EV_READ|EV_WRITE))
poll_add(op, ev, NULL);
}
return (1); /* 1 means the caller should poll() again */
} else if (sigismember(&op->sigs, signum)) {
/* managed signals are queued */
ev = TAILQ_FIRST(&signalqueue);
while (ev != TAILQ_END(&signalqueue)) {
next_ev = TAILQ_NEXT(ev, ev_signal_next);
if (EVENT_SIGNAL(ev) == signum)
activate(ev, EV_SIGNAL);
ev = next_ev;
}
} else {
/* dispatch unmanaged signals immediately */
struct sigaction sa;
if (sigaction(signum, NULL, &sa) == 0) {
if ((sa.sa_flags & SA_SIGINFO) && sa.sa_sigaction) {
(*sa.sa_sigaction)(signum, info, NULL);
} else if (sa.sa_handler) {
if ((int)sa.sa_handler != 1)
(*sa.sa_handler)(signum);
} else {
if (signum != SIGCHLD) {
/* non-blocked SIG_DFL */
kill(gettid(), signum);
}
}
}
}
return (0);
}
/*
* return 1 if we should poll again
* return 0 if we are all set
* return -1 on error
*/
static inline int
do_sigwait(struct event_base *base, struct rtsigop *op, struct timespec *ts,
sigset_t *sigs)
{
for (;;) {
siginfo_t info;
int signum;
signum = sigtimedwait(sigs, &info, ts);
ts->tv_sec = 0;
ts->tv_nsec = 0;
if (signum == -1) {
if (errno == EAGAIN || errno == EINTR)
return (0);
return (-1);
} else if (1 == do_siginfo_dispatch(base, op, &info)) {
return (1);
}
}
/* NOTREACHED */
}
static inline int
do_signals_from_socket(struct event_base *base, struct rtsigop *op,
struct timespec *ts)
{
int fd = op->signal_recv_fd;
siginfo_t info;
int res;
for (;;) {
res = recv(fd, &info, sizeof(info), MSG_NOSIGNAL);
if (res == -1) {
if (errno == EAGAIN)
return (0);
if (errno == EINTR)
continue;
return (-1);
} else {
ts->tv_sec = 0;
ts->tv_nsec = 0;
if (1 == do_siginfo_dispatch(base, op, &info))
return (1);
}
}
/* NOTREACHED */
}
int
rtsig_dispatch(struct event_base *base, void *arg, struct timeval *tv)
{
struct rtsigop *op = (struct rtsigop *) arg;
struct timespec ts;
int res;
sigset_t sigs;
ts.tv_sec = tv->tv_sec;
ts.tv_nsec = tv->tv_usec * 1000;
poll_for_level:
res = do_poll(op, &ts); /* ts can be modified in do_XXX() */
res = do_signals_from_socket(base, op, &ts);
if (res == 1)
goto poll_for_level;
else if (res == -1)
return (-1);
/*
* the mask = managed_signals | unblocked-signals
* MM - if this is not blocking do we need to cast the net this wide?
*/
sigemptyset(&sigs);
sigprocmask(SIG_BLOCK, &sigs, &sigs);
signotset(&sigs);
sigorset(&sigs, &sigs, &op->sigs);
res = do_sigwait(base, op, &ts, &sigs);
if (res == 1)
goto poll_for_level;
else if (res == -1)
return (-1);
return (0);
}