Edit

kc3-lang/libevent/evrpc.c

Branch :

  • Show log

    Commit

  • Author : Nick Mathewson
    Date : 2007-11-07 06:01:57
    Hash : f74e7258
    Message : r16501@catbus: nickm | 2007-11-07 01:00:31 -0500 This is one of those patches which will either make matters far simpler after the bugs shake out, or will get reverted pretty quick once we realize that it is a stupid idea. We now post-process the config.h file into a new event-config.h file, whose macros are prefixed with _EVENT_ and which is thus safe for headers to include. Using this, we can define replacement timeval manipulation functions in evutil.h, and use them uniformly through our code. We can also detect which headers are needful in event.h, and include them as required. This is also the perfect time to remove the long-deprecated acconfig.h file, so that autoheader no longer warns. Should resolve the following issues: [ 1826530 ] Header files should have access to autoconf output. [ 1826545 ] acconfig.h is deprecated. [ 1826564 ] On some platforms, event.h can't be included alone. svn:r492

  • evrpc.c
  • /*
     * Copyright (c) 2000-2004 Niels Provos <provos@citi.umich.edu>
     * All rights reserved.
     *
     * Redistribution and use in source and binary forms, with or without
     * modification, are permitted provided that the following conditions
     * are met:
     * 1. Redistributions of source code must retain the above copyright
     *    notice, this list of conditions and the following disclaimer.
     * 2. Redistributions in binary form must reproduce the above copyright
     *    notice, this list of conditions and the following disclaimer in the
     *    documentation and/or other materials provided with the distribution.
     * 3. The name of the author may not be used to endorse or promote products
     *    derived from this software without specific prior written permission.
     *
     * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
     * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
     * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
     * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
     * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
     * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
     * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
     * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
     * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
     * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     */
    #ifdef HAVE_CONFIG_H
    #include "config.h"
    #endif
    
    #ifdef WIN32
    #define WIN32_LEAN_AND_MEAN
    #include <windows.h>
    #include <winsock2.h>
    #undef WIN32_LEAN_AND_MEAN
    #include "misc.h"
    #endif
    
    #include <sys/types.h>
    #ifndef WIN32
    #include <sys/socket.h>
    #endif
    #ifdef HAVE_SYS_TIME_H
    #include <sys/time.h>
    #else
    #include <sys/_time.h>
    #endif
    #include <sys/queue.h>
    #include <stdio.h>
    #include <stdlib.h>
    #ifndef WIN32
    #include <unistd.h>
    #endif
    #include <errno.h>
    #include <signal.h>
    #include <string.h>
    #include <assert.h>
    
    #include "event.h"
    #include "evrpc.h"
    #include "evrpc-internal.h"
    #include "evhttp.h"
    #include "evutil.h"
    #include "log.h"
    
    struct evrpc_base *
    evrpc_init(struct evhttp *http_server)
    {
    	struct evrpc_base* base = calloc(1, sizeof(struct evrpc_base));
    	if (base == NULL)
    		return (NULL);
    
    	/* we rely on the tagging sub system */
    	evtag_init();
    
    	TAILQ_INIT(&base->registered_rpcs);
    	TAILQ_INIT(&base->input_hooks);
    	TAILQ_INIT(&base->output_hooks);
    	base->http_server = http_server;
    
    	return (base);
    }
    
    void
    evrpc_free(struct evrpc_base *base)
    {
    	struct evrpc *rpc;
    	struct evrpc_hook *hook;
    
    	while ((rpc = TAILQ_FIRST(&base->registered_rpcs)) != NULL) {
    		assert(evrpc_unregister_rpc(base, rpc->uri));
    	}
    	while ((hook = TAILQ_FIRST(&base->input_hooks)) != NULL) {
    		assert(evrpc_remove_hook(base, INPUT, hook));
    	}
    	while ((hook = TAILQ_FIRST(&base->output_hooks)) != NULL) {
    		assert(evrpc_remove_hook(base, OUTPUT, hook));
    	}
    	free(base);
    }
    
    void *
    evrpc_add_hook(void *vbase,
        enum EVRPC_HOOK_TYPE hook_type,
        int (*cb)(struct evhttp_request *, struct evbuffer *, void *),
        void *cb_arg)
    {
    	struct _evrpc_hooks *base = vbase;
    	struct evrpc_hook_list *head = NULL;
    	struct evrpc_hook *hook = NULL;
    	switch (hook_type) {
    	case INPUT:
    		head = &base->in_hooks;
    		break;
    	case OUTPUT:
    		head = &base->out_hooks;
    		break;
    	default:
    		assert(hook_type == INPUT || hook_type == OUTPUT);
    	}
    
    	hook = calloc(1, sizeof(struct evrpc_hook));
    	assert(hook != NULL);
    	
    	hook->process = cb;
    	hook->process_arg = cb_arg;
    	TAILQ_INSERT_TAIL(head, hook, next);
    
    	return (hook);
    }
    
    static int
    evrpc_remove_hook_internal(struct evrpc_hook_list *head, void *handle)
    {
    	struct evrpc_hook *hook = NULL;
    	TAILQ_FOREACH(hook, head, next) {
    		if (hook == handle) {
    			TAILQ_REMOVE(head, hook, next);
    			free(hook);
    			return (1);
    		}
    	}
    
    	return (0);
    }
    
    /*
     * remove the hook specified by the handle
     */
    
    int
    evrpc_remove_hook(void *vbase, enum EVRPC_HOOK_TYPE hook_type, void *handle)
    {
    	struct _evrpc_hooks *base = vbase;
    	struct evrpc_hook_list *head = NULL;
    	switch (hook_type) {
    	case INPUT:
    		head = &base->in_hooks;
    		break;
    	case OUTPUT:
    		head = &base->out_hooks;
    		break;
    	default:
    		assert(hook_type == INPUT || hook_type == OUTPUT);
    	}
    
    	return (evrpc_remove_hook_internal(head, handle));
    }
    
    static int
    evrpc_process_hooks(struct evrpc_hook_list *head,
        struct evhttp_request *req, struct evbuffer *evbuf)
    {
    	struct evrpc_hook *hook;
    	TAILQ_FOREACH(hook, head, next) {
    		if (hook->process(req, evbuf, hook->process_arg) == -1)
    			return (-1);
    	}
    
    	return (0);
    }
    
    static void evrpc_pool_schedule(struct evrpc_pool *pool);
    static void evrpc_request_cb(struct evhttp_request *, void *);
    void evrpc_request_done(struct evrpc_req_generic*);
    
    /*
     * Registers a new RPC with the HTTP server.   The evrpc object is expected
     * to have been filled in via the EVRPC_REGISTER_OBJECT macro which in turn
     * calls this function.
     */
    
    char *
    evrpc_construct_uri(const char *uri)
    {
    	char *constructed_uri;
    	int constructed_uri_len;
    
    	constructed_uri_len = strlen(EVRPC_URI_PREFIX) + strlen(uri) + 1;
    	if ((constructed_uri = malloc(constructed_uri_len)) == NULL)
    		event_err(1, "%s: failed to register rpc at %s",
    		    __func__, uri);
    	memcpy(constructed_uri, EVRPC_URI_PREFIX, strlen(EVRPC_URI_PREFIX));
    	memcpy(constructed_uri + strlen(EVRPC_URI_PREFIX), uri, strlen(uri));
    	constructed_uri[constructed_uri_len - 1] = '\0';
    
    	return (constructed_uri);
    }
    
    int
    evrpc_register_rpc(struct evrpc_base *base, struct evrpc *rpc,
        void (*cb)(struct evrpc_req_generic *, void *), void *cb_arg)
    {
    	char *constructed_uri = evrpc_construct_uri(rpc->uri);
    
    	rpc->base = base;
    	rpc->cb = cb;
    	rpc->cb_arg = cb_arg;
    
    	TAILQ_INSERT_TAIL(&base->registered_rpcs, rpc, next);
    
    	evhttp_set_cb(base->http_server,
    	    constructed_uri,
    	    evrpc_request_cb,
    	    rpc);
    	
    	free(constructed_uri);
    
    	return (0);
    }
    
    int
    evrpc_unregister_rpc(struct evrpc_base *base, const char *name)
    {
    	char *registered_uri = NULL;
    	struct evrpc *rpc;
    
    	/* find the right rpc; linear search might be slow */
    	TAILQ_FOREACH(rpc, &base->registered_rpcs, next) {
    		if (strcmp(rpc->uri, name) == 0)
    			break;
    	}
    	if (rpc == NULL) {
    		/* We did not find an RPC with this name */
    		return (-1);
    	}
    	TAILQ_REMOVE(&base->registered_rpcs, rpc, next);
    	
    	free((char *)rpc->uri);
    	free(rpc);
    
            registered_uri = evrpc_construct_uri(name);
    
    	/* remove the http server callback */
    	assert(evhttp_del_cb(base->http_server, registered_uri) == 0);
    
    	free(registered_uri);
    	return (0);
    }
    
    static void
    evrpc_request_cb(struct evhttp_request *req, void *arg)
    {
    	struct evrpc *rpc = arg;
    	struct evrpc_req_generic *rpc_state = NULL;
    
    	/* let's verify the outside parameters */
    	if (req->type != EVHTTP_REQ_POST ||
    	    EVBUFFER_LENGTH(req->input_buffer) <= 0)
    		goto error;
    
    	/*
    	 * we might want to allow hooks to suspend the processing,
    	 * but at the moment, we assume that they just act as simple
    	 * filters.
    	 */
    	if (evrpc_process_hooks(&rpc->base->input_hooks,
    		req, req->input_buffer) == -1)
    		goto error;
    
    	rpc_state = calloc(1, sizeof(struct evrpc_req_generic));
    	if (rpc_state == NULL)
    		goto error;
    
    	/* let's check that we can parse the request */
    	rpc_state->request = rpc->request_new();
    	if (rpc_state->request == NULL)
    		goto error;
    
    	rpc_state->rpc = rpc;
    
    	if (rpc->request_unmarshal(
    		    rpc_state->request, req->input_buffer) == -1) {
    		/* we failed to parse the request; that's a bummer */
    		goto error;
    	}
    
    	/* at this point, we have a well formed request, prepare the reply */
    
    	rpc_state->reply = rpc->reply_new();
    	if (rpc_state->reply == NULL)
    		goto error;
    
    	rpc_state->http_req = req;
    	rpc_state->done = evrpc_request_done;
    
    	/* give the rpc to the user; they can deal with it */
    	rpc->cb(rpc_state, rpc->cb_arg);
    
    	return;
    
    error:
    	evrpc_reqstate_free(rpc_state);
    	evhttp_send_error(req, HTTP_SERVUNAVAIL, "Service Error");
    	return;
    }
    
    void
    evrpc_reqstate_free(struct evrpc_req_generic* rpc_state)
    {
    	/* clean up all memory */
    	if (rpc_state != NULL) {
    		struct evrpc *rpc = rpc_state->rpc;
    
    		if (rpc_state->request != NULL)
    			rpc->request_free(rpc_state->request);
    		if (rpc_state->reply != NULL)
    			rpc->reply_free(rpc_state->reply);
    		free(rpc_state);
    	}
    }
    
    void
    evrpc_request_done(struct evrpc_req_generic* rpc_state)
    {
    	struct evhttp_request *req = rpc_state->http_req;
    	struct evrpc *rpc = rpc_state->rpc;
    	struct evbuffer* data = NULL;
    
    	if (rpc->reply_complete(rpc_state->reply) == -1) {
    		/* the reply was not completely filled in.  error out */
    		goto error;
    	}
    
    	if ((data = evbuffer_new()) == NULL) {
    		/* out of memory */
    		goto error;
    	}
    
    	/* serialize the reply */
    	rpc->reply_marshal(data, rpc_state->reply);
    
    	/* do hook based tweaks to the request */
    	if (evrpc_process_hooks(&rpc->base->output_hooks,
    		req, data) == -1)
    		goto error;
    
    	evhttp_send_reply(req, HTTP_OK, "OK", data);
    
    	evbuffer_free(data);
    
    	evrpc_reqstate_free(rpc_state);
    
    	return;
    
    error:
    	if (data != NULL)
    		evbuffer_free(data);
    	evrpc_reqstate_free(rpc_state);
    	evhttp_send_error(req, HTTP_SERVUNAVAIL, "Service Error");
    	return;
    }
    
    /* Client implementation of RPC site */
    
    static int evrpc_schedule_request(struct evhttp_connection *connection,
        struct evrpc_request_wrapper *ctx);
    
    struct evrpc_pool *
    evrpc_pool_new(struct event_base *base)
    {
    	struct evrpc_pool *pool = calloc(1, sizeof(struct evrpc_pool));
    	if (pool == NULL)
    		return (NULL);
    
    	TAILQ_INIT(&pool->connections);
    	TAILQ_INIT(&pool->requests);
    
    	TAILQ_INIT(&pool->input_hooks);
    	TAILQ_INIT(&pool->output_hooks);
    
    	pool->base = base;
    	pool->timeout = -1;
    
    	return (pool);
    }
    
    static void
    evrpc_request_wrapper_free(struct evrpc_request_wrapper *request)
    {
    	free(request->name);
    	free(request);
    }
    
    void
    evrpc_pool_free(struct evrpc_pool *pool)
    {
    	struct evhttp_connection *connection;
    	struct evrpc_request_wrapper *request;
    	struct evrpc_hook *hook;
    
    	while ((request = TAILQ_FIRST(&pool->requests)) != NULL) {
    		TAILQ_REMOVE(&pool->requests, request, next);
    		/* if this gets more complicated we need our own function */
    		evrpc_request_wrapper_free(request);
    	}
    
    	while ((connection = TAILQ_FIRST(&pool->connections)) != NULL) {
    		TAILQ_REMOVE(&pool->connections, connection, next);
    		evhttp_connection_free(connection);
    	}
    
    	while ((hook = TAILQ_FIRST(&pool->input_hooks)) != NULL) {
    		assert(evrpc_remove_hook(pool, INPUT, hook));
    	}
    
    	while ((hook = TAILQ_FIRST(&pool->output_hooks)) != NULL) {
    		assert(evrpc_remove_hook(pool, OUTPUT, hook));
    	}
    
    	free(pool);
    }
    
    /*
     * Add a connection to the RPC pool.   A request scheduled on the pool
     * may use any available connection.
     */
    
    void
    evrpc_pool_add_connection(struct evrpc_pool *pool,
        struct evhttp_connection *connection) {
    	assert(connection->http_server == NULL);
    	TAILQ_INSERT_TAIL(&pool->connections, connection, next);
    
    	/*
    	 * associate an event base with this connection
    	 */
    	if (pool->base != NULL)
    		evhttp_connection_set_base(connection, pool->base);
    
    	/* 
    	 * unless a timeout was specifically set for a connection,
    	 * the connection inherits the timeout from the pool.
    	 */
    	if (connection->timeout == -1)
    		connection->timeout = pool->timeout;
    
    	/* 
    	 * if we have any requests pending, schedule them with the new
    	 * connections.
    	 */
    
    	if (TAILQ_FIRST(&pool->requests) != NULL) {
    		struct evrpc_request_wrapper *request = 
    		    TAILQ_FIRST(&pool->requests);
    		TAILQ_REMOVE(&pool->requests, request, next);
    		evrpc_schedule_request(connection, request);
    	}
    }
    
    void
    evrpc_pool_set_timeout(struct evrpc_pool *pool, int timeout_in_secs)
    {
    	struct evhttp_connection *evcon;
    	TAILQ_FOREACH(evcon, &pool->connections, next) {
    		evcon->timeout = timeout_in_secs;
    	}
    	pool->timeout = timeout_in_secs;
    }
    
    
    static void evrpc_reply_done(struct evhttp_request *, void *);
    static void evrpc_request_timeout(int, short, void *);
    
    /*
     * Finds a connection object associated with the pool that is currently
     * idle and can be used to make a request.
     */
    static struct evhttp_connection *
    evrpc_pool_find_connection(struct evrpc_pool *pool)
    {
    	struct evhttp_connection *connection;
    	TAILQ_FOREACH(connection, &pool->connections, next) {
    		if (TAILQ_FIRST(&connection->requests) == NULL)
    			return (connection);
    	}
    
    	return (NULL);
    }
    
    /*
     * We assume that the ctx is no longer queued on the pool.
     */
    static int
    evrpc_schedule_request(struct evhttp_connection *connection,
        struct evrpc_request_wrapper *ctx)
    {
    	struct evhttp_request *req = NULL;
    	struct evrpc_pool *pool = ctx->pool;
    	struct evrpc_status status;
    	char *uri = NULL;
    	int res = 0;
    
    	if ((req = evhttp_request_new(evrpc_reply_done, ctx)) == NULL)
    		goto error;
    
    	/* serialize the request data into the output buffer */
    	ctx->request_marshal(req->output_buffer, ctx->request);
    
    	uri = evrpc_construct_uri(ctx->name);
    	if (uri == NULL)
    		goto error;
    
    	/* we need to know the connection that we might have to abort */
    	ctx->evcon = connection;
    
    	/* apply hooks to the outgoing request */
    	if (evrpc_process_hooks(&pool->output_hooks,
    		req, req->output_buffer) == -1)
    		goto error;
    
    	if (pool->timeout > 0) {
    		/* 
    		 * a timeout after which the whole rpc is going to be aborted.
    		 */
    		struct timeval tv;
    		evutil_timerclear(&tv);
    		tv.tv_sec = pool->timeout;
    		evtimer_add(&ctx->ev_timeout, &tv);
    	}
    
    	/* start the request over the connection */
    	res = evhttp_make_request(connection, req, EVHTTP_REQ_POST, uri);
    	free(uri);
    
    	if (res == -1)
    		goto error;
    
    	return (0);
    
    error:
    	memset(&status, 0, sizeof(status));
    	status.error = EVRPC_STATUS_ERR_UNSTARTED;
    	(*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg);
    	evrpc_request_wrapper_free(ctx);
    	return (-1);
    }
    
    int
    evrpc_make_request(struct evrpc_request_wrapper *ctx)
    {
    	struct evrpc_pool *pool = ctx->pool;
    
    	/* initialize the event structure for this rpc */
    	evtimer_set(&ctx->ev_timeout, evrpc_request_timeout, ctx);
    	if (pool->base != NULL)
    		event_base_set(pool->base, &ctx->ev_timeout);
    
    	/* we better have some available connections on the pool */
    	assert(TAILQ_FIRST(&pool->connections) != NULL);
    
    	/* 
    	 * if no connection is available, we queue the request on the pool,
    	 * the next time a connection is empty, the rpc will be send on that.
    	 */
    	TAILQ_INSERT_TAIL(&pool->requests, ctx, next);
    
    	evrpc_pool_schedule(pool);
    
    	return (0);
    }
    
    static void
    evrpc_reply_done(struct evhttp_request *req, void *arg)
    {
    	struct evrpc_request_wrapper *ctx = arg;
    	struct evrpc_pool *pool = ctx->pool;
    	struct evrpc_status status;
    	int res = -1;
    	
    	/* cancel any timeout we might have scheduled */
    	event_del(&ctx->ev_timeout);
    
    	memset(&status, 0, sizeof(status));
    	status.http_req = req;
    
    	/* we need to get the reply now */
    	if (req != NULL) {
    		/* apply hooks to the incoming request */
    		if (evrpc_process_hooks(&pool->input_hooks,
    			req, req->input_buffer) == -1) {
    			status.error = EVRPC_STATUS_ERR_HOOKABORTED;
    			res = -1;
    		} else {
    			res = ctx->reply_unmarshal(ctx->reply,
    			    req->input_buffer);
    			if (res == -1) {
    				status.error = EVRPC_STATUS_ERR_BADPAYLOAD;
    			}
    		}
    	} else {
    		status.error = EVRPC_STATUS_ERR_TIMEOUT;
    	}
    
    	if (res == -1) {
    		/* clear everything that we might have written previously */
    		ctx->reply_clear(ctx->reply);
    	}
    
    	(*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg);
    	
    	evrpc_request_wrapper_free(ctx);
    
    	/* the http layer owns the request structure */
    
    	/* see if we can schedule another request */
    	evrpc_pool_schedule(pool);
    }
    
    static void
    evrpc_pool_schedule(struct evrpc_pool *pool)
    {
    	struct evrpc_request_wrapper *ctx = TAILQ_FIRST(&pool->requests);
    	struct evhttp_connection *evcon;
    
    	/* if no requests are pending, we have no work */
    	if (ctx == NULL)
    		return;
    
    	if ((evcon = evrpc_pool_find_connection(pool)) != NULL) {
    		TAILQ_REMOVE(&pool->requests, ctx, next);
    		evrpc_schedule_request(evcon, ctx);
    	}
    }
    
    static void
    evrpc_request_timeout(int fd, short what, void *arg)
    {
    	struct evrpc_request_wrapper *ctx = arg;
    	struct evhttp_connection *evcon = ctx->evcon;
    	assert(evcon != NULL);
    
    	evhttp_connection_fail(evcon, EVCON_HTTP_TIMEOUT);
    }