/*
 * Asterisk -- A telephony toolkit for Linux.
 *
 * Non-blocking I/O socket manager/implementation for asterisk.
 *
 *  - Theo P. Zourzouvillys <theo@crazygreek.co.uk>
 *
 * This program is free software, distributed under the terms of
 * the GNU General Public License
 */

/*
 * The idea behind this is that many of asterisks internal
 * parts are starting up threads for themselfs when they don't
 * really need to.  A fair few threads in asterisk can handle 
 * more than well with non-blocking sockets and a single
 * thread to service them all.
 *
 * This code tries to be that solution.  Think of this code as an
 * 'asterisk network abstraction layer' - A socket is registered
 * with the NBIO subsystem, and it takes care of poll()'ing, 
 * dispatching events by callbacks, buffering (or not), etc.

 * However - Only sockets that can garantee not to block should 
 * use this.  If you need to block, think twice about why - there's
 * not much you can't do with non-blocking i/o.  If you're REALLY
 * sure you need to block, then launch your own thread for that.
 * 
 * Because almost all network activity can flow through here, we
 * focus on a rock solid, damn fast network implementation, making
 * sure we do thigns like lingering closes, naggle, output buffering, 
 * etc.
 * 
 * The API is documented in <include/asterisk/nbio.h>
 * 
 */

/*
 * 
 * The way we go about this may seem a bit strage, but 
 * the most important thign for us is how quickly we can
 * service sockets - so we make sure we can do it damn
 * fast.
 *
 * That's why we have 2 structs - one for poll() and one
 * for the sockets we're currently servicing.
 *
 * Because of that, lookup time is next to none
 * compared to your average multi-socket servicing
 * routine.
 *
 */

#include <errno.h>
#include <unistd.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>

#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>

#include <asterisk/cli.h>
#include <asterisk/nbio.h>
#include <asterisk/lock.h>
#include <asterisk/utils.h>
#include <asterisk/logger.h>
#include <asterisk/module.h>
#include <asterisk/options.h>

/* 
 * How many fd's we can initially handle without reallocating
 * memory.
 * 
 * XXX:T: Tune this once we've got core parts using NBIO.
 */

#define NBIO_DEFAULT_ARRAY_SIZE 8

AST_MUTEX_DEFINE_STATIC(nbio_manager);
AST_MUTEX_DEFINE_STATIC(nbio_start_lock);
AST_MUTEX_DEFINE_STATIC(nbio_array_lock);

static int nbio_init = 0;
static int nbio_control_fd = -1;
static int nbio_unblock_fd = -1;
static pthread_t nbio_thread_id;
static pthread_cond_t nbio_thread_cond = PTHREAD_COND_INITIALIZER;
static int nbio_maxfd = -1;

/* Global array of current fd's being processed */

static nbio_socket_t ** nbio_array = NULL;
static int nbio_array_size = 0;
static int nbio_array_count = 0;

/* Pass message to the controller thread */

struct pollfd * poll_array;
typedef struct _nbio_control nbio_control_t;

struct _nbio_control {
	enum {
		NBIO_CONTROL_ADD,
		NBIO_CONTROL_DEL,		
	} type;
	nbio_socket_t * ptr;
	int fd;
	nbio_control_t * next;
} * nbio_control = NULL;

static
int sock_err(int fd)
{
  int ret;
  int len = sizeof(ret);
  getsockopt(fd, SOL_SOCKET, SO_ERROR, &ret, &len);
  return ret;
}

static int
add_fd_fromthread(nbio_socket_t * nbio)
{
	int i;

	ast_log(LOG_DEBUG, "Adding item current=%d, size=%d\n", nbio_array_count, nbio_array_size);
	if (nbio_array_size == nbio_array_count) {

		/* 
		 * nbio_array is full - make it larger.
		 */

		nbio_socket_t ** old_array = nbio_array;
		nbio_socket_t ** new_array = ast_new0(nbio_socket_t *, nbio_array_size * 2);

		if (AST_UNLIKELY(new_array == NULL)) {
			ast_log(LOG_ERROR, "Failed to resize NBIO array\n");
			return -1;
		}

		nbio_array = memcpy(new_array, old_array, sizeof(nbio_socket_t*) * nbio_array_size);
		nbio_array_size *= 2;
		ast_free(old_array);
		ast_log(LOG_DEBUG, "Enlarged NBIO array to %d elements [%p]\n", nbio_array_size, nbio_array[0]);
	}

	ast_log(LOG_DEBUG, "Added NBIO socket with FD %d to slot %d (%s)\n", nbio->fd, nbio_array_count, nbio->desc);

	nbio_array[nbio_array_count++] = nbio;

	// XXX:T: Somethign else might be used here to stop possible memory fragmentation.

	ast_free(poll_array);
	poll_array = ast_new0(struct pollfd, nbio_array_count);

	for ( i = 0 ; i < nbio_array_count ; i++) {
		poll_array[i].fd = nbio_array[i]->fd;
		poll_array[i].events = POLLIN|POLLERR|POLLHUP|POLLNVAL;
	}

	return 0;
}

static int
del_fd_fromthread(nbio_socket_t * nbio)
{
	int pos = 0;

	/* 
	 * Find this elements position in nbio_array
	 */

	while (pos < nbio_array_count && nbio_array[pos] != NULL) {
		if (nbio_array[pos] == nbio)
			break;
		pos++;
	}

	if (pos == nbio_array_count) {
		/* 
		 * Damn - we can't find it.
		 */

		ast_log(LOG_ERROR, "Attempted to remove unregistered NBIO socket %p!?\n", nbio);
		return -1;
	}

	/* 
	 * Splice this element out by moving pos + 1 to pos
	 */

	memmove(nbio_array + pos, nbio_array + pos + 1, nbio_array_count - pos);
	nbio_array_count--;

	ast_log(LOG_DEBUG, "Removed FD %d from NBIO subsystem\n", nbio->fd);

	if (nbio->fd >= 0) {
		if (nbio->flags & AST_NBIO_TYPE_INCOMING) {
			if (nbio->cb.incoming && nbio->cb.incoming->error)
				nbio->cb.incoming->close(nbio, AST_NBIO_EVENT_HUP, nbio->user_data);
		}
		close(nbio->fd);
		nbio->fd = -1;
	}

	return 0;
}

static void
handle_control(void)
{
	nbio_control_t * control;
	ast_mutex_lock(&nbio_manager);

	control = nbio_control;
	nbio_control = control->next;

	switch (control->type) {
		case NBIO_CONTROL_ADD:
			add_fd_fromthread(control->ptr);
			break;
		case NBIO_CONTROL_DEL:
			// XXX:T: Implementation Required!
			break;
		default:
			ast_log(LOG_ERROR, "Unhandled NBIO control packet of type %d\n", control->type);
			break;
	}

	ast_free(control);
	ast_mutex_unlock(&nbio_manager);
}


static int
handle_poll_event(nbio_socket_t * nbio, int flags)
{
	int res;

	ast_log(LOG_ERROR, "Activity on FD %d: %d (%s) (errno %d)\n", nbio->fd, flags, strerror(sock_err(nbio->fd)), sock_err(nbio->fd));

	if (flags & POLLIN) {

		/*
		 * We read data before we cause errors, as we want to make sure there is
		 * no data left in the socket's buffer.
		 */

		if (nbio->flags & AST_NBIO_TYPE_INCOMING) {

			if (nbio->cb.incoming->data) {

				res = nbio->cb.incoming->data(nbio, AST_NBIO_EVENT_IN, nbio->user_data);

				if (res == -1) {
					ast_log(LOG_ERROR, "Incoming data callback signaled to close() for FD %d\n", nbio->fd);
					del_fd_fromthread(nbio);
					return -1;
				}

			} else {
				ast_log(LOG_ERROR, "Incoming data, but no callback for FD %d\n", nbio->fd);
				del_fd_fromthread(nbio);
				return -1;
			}

		} else if (nbio->flags & AST_NBIO_TYPE_LISTEN) {

			int sinlen = MAX(nbio->sin_len, sizeof(struct sockaddr));
			void * sin = ast_malloc0(sinlen);
			int new_fd;

			new_fd = accept(nbio->fd, (struct sockaddr *)sin, (socklen_t*)&sinlen);

			if (new_fd == -1) {

				ast_log(LOG_ERROR, "Failed to call accept() on listen socket: %s\n", strerror(errno));
				ast_free(sin);

			} else {


				int arg = 1;

				if (nbio->flags & AST_NBIO_FLAG_NODELAY) {
					arg = 1;
					if (setsockopt(new_fd, SOL_TCP, TCP_NODELAY, (char *)&arg, sizeof(arg)) != 0) {
						ast_log(LOG_ERROR, "Failed to setsockopt(TCP_NODELAY) on new connection from %s: %s\n", nbio->desc, strerror(errno));
						ast_free(sin);
						close(new_fd);
						return 0;
					}
				}

#if 0
				if (!(nbio->flags & AST_NBIO_FLAG_BLOCK)) {
					if (fcntl(new_fd, F_SETFL, O_NONBLOCK) != 0) {
						ast_log(LOG_ERROR, "Failed to fcntl(O_NONBLOCK) on new connection from %s: %s\n", nbio->desc, strerror(errno));
						ast_free(sin);
						close(new_fd);
						return 0;
					}
				}
#endif

				switch (nbio->incoming.listen(nbio->fd, new_fd, sin, nbio->user_data)) {

					case -1:
						ast_free(sin);
						close(new_fd);
						break;

					default:
						ast_free(sin);

				}

			}

		}

	} else if (flags & POLLNVAL) {

		/*
		 * Invalid FD - Panic! 
		 * We should really never see this.  If we do, something is screwed.
		 */

		if (nbio->cb.incoming)
			nbio->cb.incoming->error(nbio, AST_NBIO_EVENT_NVAL, nbio->user_data);

	} else if (flags & POLLERR) {

		/*
		 * An error has occured on this FD. Notify cb, and remove.
		 */

		if (nbio->cb.incoming)
			nbio->cb.incoming->error(nbio, AST_NBIO_EVENT_ERR, nbio->user_data);

		del_fd_fromthread(nbio);
		return -1;

	} else if (flags & POLLHUP) {

		/*
		 * Connection closed by remote end.
		 */

		if (nbio->cb.incoming)
			nbio->cb.incoming->close(nbio, AST_NBIO_EVENT_HUP, nbio->user_data);

		del_fd_fromthread(nbio);
		return -1;

	} else {

		ast_log(LOG_NOTICE, "Unhandled event on FD\n");

	}

	return 0;
}

/* 
 * The main NBIO thread.
 */

static void * nbio_thread(void * data)
{
	int ret = 0;
	int i;
	int die = 0;

	ast_mutex_lock(&nbio_array_lock);

	ast_log(LOG_DEBUG, "NBIO thread started as PID %d, %d element(s)\n", getpid(), nbio_array_count);

	ast_mutex_lock(&nbio_start_lock);
	pthread_cond_signal(&nbio_thread_cond);
	ast_mutex_unlock(&nbio_start_lock);

	while (AST_LIKELY(die == 0)) {

		/*
		 * We can block forever here becase we get our signaling from
		 * poll_array[0].fd which is the pipe() created in nbio_start().
		 */

		ret = poll(poll_array, nbio_array_count, -1);

		if (AST_UNLIKELY(ret == -1)) {
			if (errno == EINTR)
				continue;
			break;
		}

		if (ret == 0)
			continue;

		/*
		 * Loop through all sockets, and see which ones have activity 
		 */

		for (i = 0 ; i < nbio_array_count; i++) {

			if (poll_array[i].revents) {

				/*
				 * First check the control fd, see if there is any activity.
				 */

				if (nbio_array[i]->flags == AST_NBIO_TYPE_CONTROL) {

					unsigned char c = 0;

					if (read(nbio_array[i]->fd, &c, 1) != 1) {
						ast_log(LOG_ERROR, "read() from nbio controller failed: %s\n", strerror(errno));
						die = 1;
						break;
					}

					switch (c) {
						case 'U':
							/*
							 * Someone wants to modify nbio_array - Do It!
							 */
							ast_log(LOG_DEBUG, "Got NBIO update command\n");
							handle_control();
							ret = 0;
							break;
						case 'D':
							/*
							 * We need to die
							 */
							die = 1;
							break;
						default:
							ast_log(LOG_ERROR, "Unknown NBIO command ascii %d", c);
							break;
					}

				} else {

					if (handle_poll_event(nbio_array[i], poll_array[i].revents) == -1) {
						/*
						 * This poll modified nbio_array - re-poll and don't
						 * scan through nbio_array or poll_array anymore this loop - it's changed!
						 */
						break;
					}

					poll_array[i].revents = 0;

				}

				if (--ret == 0)
					break;
			}
		}


	}

	ast_mutex_unlock(&nbio_array_lock);

	close(nbio_control_fd);
	nbio_control_fd = -1;

	if (errno)
		ast_log(LOG_ERROR, "NBIO exited with error: %s (errno %d)\n", strerror(errno), errno);
	else
		ast_log(LOG_NOTICE, "NBIO thread finished\n");
	return NULL;
}

static int
add_fd(nbio_socket_t * nbio)
{
	nbio_control_t * search;
	nbio_control_t * control = ast_new0(nbio_control_t, 1);

	ast_mutex_lock(&nbio_manager);

	control->type = NBIO_CONTROL_ADD;
	control->ptr = nbio;

	if (nbio_control) {
		search = nbio_control;
		while (search->next)
			search = search->next;

		search->next = control;

	} else {

		nbio_control = control;

	}

	write(nbio_unblock_fd, "U", 1);
	ast_mutex_unlock(&nbio_manager);
	return 0;
}

/* CLI functions */

static const char *
type2str(nbio_socket_t * nbio)
{
	if (nbio->flags == AST_NBIO_TYPE_CONTROL)
		return "Controller";
	else if (nbio->flags & AST_NBIO_TYPE_LISTEN)
		return "Listening";
	else if (nbio->flags & AST_NBIO_TYPE_INCOMING)
		return "Incoming";
	else if (nbio->flags & AST_NBIO_TYPE_OUTGOING)
		return "Outgoing";
	else
		return "Unknown";
}

static int
show_sockets(int fd, int argc, char *argv[])
{
	int i;
	ast_mutex_lock(&nbio_manager);
	ast_cli(fd, "\n");
	ast_cli(fd, "  FD Description      Type      \n");
	ast_cli(fd, "---- ---------------- ----------\n");
	for (i = 0 ; i < nbio_array_count ; i++)
		ast_cli(fd, "%4d %-16s %-10s\n", nbio_array[i]->fd, nbio_array[i]->desc, type2str(nbio_array[i]));
	ast_cli(fd, "\n");
	ast_cli(fd, "%d elements (%d bytes) - %d in use\n", nbio_array_size, sizeof(nbio_socket_t) * nbio_array_size, nbio_array_count);
	ast_cli(fd, "\n");
	ast_mutex_unlock(&nbio_manager);
	return RESULT_SUCCESS;
}

static char show_sockets_usage[] = 
  "Usage: show nbio sockets\n"
  "       Shows NBIO sockets currently in use\n";

static struct ast_cli_entry
cli_show_sockets =
    { { "show", "nbio", "sockets", NULL }, show_sockets, "Show NBIO sockets", show_sockets_usage };

/* Shuts down the NBIO subsystem */

static void nbio_stop(void)
{
	ast_mutex_lock(&nbio_manager);

	ast_cli_unregister(&cli_show_sockets);

	/*
	 * Only shutdown the NBIO subsystem if it's been started! (although this should't ever be true)
	 */

	if (AST_UNLIKELY(nbio_init != 1)) {
		ast_mutex_unlock(&nbio_manager);
		return;
	}

	if (write(nbio_unblock_fd, "D", 1) != 1) {
		/* The nbio_thread might have crashed? */
		ast_log(LOG_ERROR, "Failed to write to NBIO unblock control socket %d: %s\n", nbio_unblock_fd, strerror(errno));
		ast_mutex_unlock(&nbio_manager);
		return;
	}

	pthread_join(nbio_thread_id, NULL);

	/*
	 * Not nessicary, but hey - be clean. 
	 */

	close(nbio_unblock_fd);
	nbio_unblock_fd = -1;

	nbio_init = 0;

	ast_free(nbio_array);
	nbio_array_size = 0;
	nbio_array_count = 0;

	ast_mutex_unlock(&nbio_manager);
}

/* Starts up the NBIO subsystem. */

static int nbio_start(void)
{
	int res;
	int controllers[2];
	nbio_socket_t * nbio_controller;

	ast_mutex_lock(&nbio_manager);

	if (nbio_init == 1) {
		ast_mutex_unlock(&nbio_manager);
		return 0;
	}

	/*
	 * Work out how many sockets we could possibly have open
	 */

	nbio_maxfd = sysconf(_SC_OPEN_MAX);

	/*
	 * Create the initial structure to hold the fd's
	 */

	nbio_array = ast_new0(nbio_socket_t *, NBIO_DEFAULT_ARRAY_SIZE);
	nbio_array_size = NBIO_DEFAULT_ARRAY_SIZE;

	/*
	 * Create a pipe for controlling the NBIO thread 
	 */

	if (pipe(controllers) != 0) {
		ast_log(LOG_ERROR, "Failed to create NBIO control pipe: %s\n", strerror(errno));
		ast_mutex_unlock(&nbio_manager);
		return -1;
	}

	nbio_control_fd = controllers[0];
	nbio_unblock_fd = controllers[1];

	/*
	 * Start off the NBIO thread 
	 */

	if ((nbio_controller = ast_new0(nbio_socket_t, 1)) == NULL) {
		ast_log(LOG_ERROR, "Failed to allocate new NBIO struct for controller\n");
		close(nbio_control_fd);
		close(nbio_unblock_fd);
		ast_free(nbio_array);
		ast_mutex_unlock(&nbio_manager);
		return -1;
	}

	nbio_array[0] = ast_new0(nbio_socket_t, 1);

	nbio_array[0]->fd = nbio_control_fd;
	nbio_array[0]->flags = AST_NBIO_TYPE_CONTROL;
	nbio_array[0]->desc = strdup("NBIO Controller");

	nbio_array_count = 1;

	ast_log(LOG_DEBUG, "Created %d element NBIO array\n", nbio_array_size);

	poll_array = ast_new0(struct pollfd, 1);
	poll_array[0].fd = nbio_control_fd;
	poll_array[0].events = POLLIN;

	ast_mutex_lock(&nbio_start_lock);
	if ((res = ast_pthread_create(&nbio_thread_id, NULL, nbio_thread, NULL)) != 0) {
		ast_log(LOG_ERROR, "Failed to start the NBIO manager: %s (errno %d)\n", strerror(res), res);
		ast_mutex_unlock(&nbio_start_lock);
		ast_mutex_unlock(&nbio_manager);
		return -1;
	}
	pthread_cond_wait(&nbio_thread_cond, &nbio_start_lock);
	ast_mutex_unlock(&nbio_start_lock);

	nbio_init = 1;
	ast_register_atexit(nbio_stop);
	ast_cli_register(&cli_show_sockets);
	ast_mutex_unlock(&nbio_manager);
	return 0;
}

static nbio_socket_t *
nbio_socket_new(void)
{
	nbio_socket_t * nbio = ast_new0(nbio_socket_t, 1);

	if (nbio_start() != 0) {
		ast_free(nbio);
		return NULL;
	}

	return nbio;
}

/*
 * Send data over a socket to a peer.
 * Attempt to buffer if we so need to,
 */

int
ast_nbio_send(ast_nbio_t * nbio, void * data_ptr, int data_len)
{
	int len;

	/*
	 * We can only send data on a INCOMING/OUTGOING socket.
	 */

	if ((!(nbio->flags & AST_NBIO_TYPE_INCOMING)) && (!(nbio->flags & AST_NBIO_TYPE_OUTGOING)))
		return -1;

	/*
	 * If outgoing buffer already has some data in it waiting to be sent, then
	 * append this data to it and return.
	 */

	if (nbio->out_buffer_len > 0)
		return nbio_buffer_outgoing_append(nbio, data_ptr, data_len);

	if ((len = send(nbio->fd, data_ptr, data_len, MSG_NOSIGNAL)) == -1) {
		/*
		 * Argh! Error!
		 */
		switch (len) {
			case EAGAIN:
				/* XXX:T: Phew - Just add to buffer. */
				break;
			default:
				/* XXX:T: Run away! kill me! kill me!
				break;
		}
	}

	return len;
}


/*
 * Registers a socket to be monitored for activity.
 */

int
ast_nbio_listen(int fd, int flags, const char * desc, nbio_connection_cb_t callback, int sockaddr_len, void * user_data)
{
	nbio_socket_t * nbio;

	if (fd < 0)
		return -1;

	// XXX:T: If a protocol specific flag specified - check the protocol (SO_GETPROTO or something)

	if ((nbio = nbio_socket_new()) == NULL)
		return -1;

	nbio->fd = fd;
	nbio->flags = AST_NBIO_TYPE_LISTEN | flags;
	nbio->user_data = user_data;
	nbio->incoming.listen = callback;
	nbio->desc = strdup(desc ? : "Unspecified");

	if (add_fd(nbio) != 0) {
		ast_log(LOG_ERROR, "Failed to add fd to nbio_array\n");
		ast_free(nbio);
		return -1;
	}

	ast_log(LOG_DEBUG, "NBIO queued listener on fd %d\n", fd);
	return 0;
}


/*
 * Create a TCP socket and add as a listener
 */

int
ast_nbio_listen_tcp(struct sockaddr_in * addr, int flags, const char * desc, nbio_connection_cb_t callback, void * user_data)
{
	int on = 1;

	int sock = socket(AF_INET, SOCK_STREAM, 0);

	if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) != 0) {
		ast_log(LOG_ERROR, "Failed to setsockopt(SO_REUSEADDR) on socket\n");
		close(sock);
		return -1;
	}

	if (bind(sock, addr, sizeof(struct sockaddr_in))) {
		ast_log(LOG_WARNING, "Unable to bind socket to address: %s\n", strerror(errno));
		close(sock);
		return -1;
	}

	if (listen(sock, 2)) {
		ast_log(LOG_WARNING, "Unable to listen on socket: %s\n", strerror(errno));
		close(sock);
		return -1;
	}

	if (ast_nbio_listen(sock, flags, desc, callback, sizeof(struct sockaddr_in), user_data) == -1) {
		close(sock);
		return -1;
	}

	return sock;
}

/*
 * Start monitoring an incoming socket (someone connected to us)
 */

nbio_socket_t *
ast_nbio_incoming(int fd, int flags, nbio_eventset_incoming_t * cbs, void * user_data)
{
	nbio_socket_t * nbio;

	if (fd < 0)
		return NULL;

	// XXX:T: If a protocol specific flag specified - check the protocol (SO_GETPROTO or something)

	if ((nbio = nbio_socket_new()) == NULL)
		return NULL;

	nbio->fd = fd;
	nbio->flags = AST_NBIO_TYPE_INCOMING | flags;
	nbio->user_data = user_data;
	nbio->cb.incoming = cbs;

	if (add_fd(nbio) != 0) {
		ast_log(LOG_ERROR, "Failed to add fd to nbio_array\n");
		ast_free(nbio);
		return NULL;
	}

	ast_log(LOG_DEBUG, "NBIO queued incoming on fd %d\n", fd);
	return nbio;
}
