Commit 56255a0c86064c3b7810f6892e136f93076ae9cd

Con Kolivas 2012-09-28T18:01:33

Create a stratum thread per pool that has stratum that monitors the socket and serves received data.

diff --git a/cgminer.c b/cgminer.c
index ac4b85f..0aba950 100644
--- a/cgminer.c
+++ b/cgminer.c
@@ -3957,6 +3957,50 @@ out_unlock:
 	}
 }
 
+/* One stratum thread per pool that has stratum waits on the socket checking
+ * for new messages and for the integrity of the socket connection. We reset
+ * the connection based on the integrity of the receive side only as the send
+ * side will eventually expire data it fails to send. */
+static void *stratum_thread(void *userdata)
+{
+	struct pool *pool = (struct pool *)userdata;
+	SOCKETTYPE sock = pool->sock;
+	fd_set rd;
+
+	pthread_detach(pthread_self());
+
+	FD_ZERO(&rd);
+	FD_SET(sock, &rd);
+
+	while (42) {
+		char *s;
+
+		if (select(sock + 1, &rd, NULL, NULL, NULL) < 0) {
+			pool->stratum_active = pool->stratum_auth = false;
+			applog(LOG_WARNING, "Stratum connection to pool %d interrupted", pool->pool_no);
+			pool->getfail_occasions++;
+			if (!initiate_stratum(pool) || !auth_stratum(pool)) {
+				pool_died(pool);
+				break;
+			}
+		}
+		s = recv_line(sock);
+		if (unlikely(!s))
+			continue;
+		if (!parse_stratum(pool, s)) /* Create message queues here */
+			applog(LOG_INFO, "Unknown stratum msg: %s", s);
+		free(s);
+	}
+
+	return NULL;
+}
+
+static void init_stratum_thread(struct pool *pool)
+{
+	if (unlikely(pthread_create(&pool->stratum_thread, NULL, stratum_thread, (void *)pool)))
+		quit(1, "Failed to create stratum thread");
+}
+
 static void *longpoll_thread(void *userdata);
 
 static bool pool_active(struct pool *pool, bool pinging)
@@ -3970,9 +4014,15 @@ static bool pool_active(struct pool *pool, bool pinging)
 	if (pool->has_stratum) {
 		if ((!pool->stratum_active || pinging) && !initiate_stratum(pool))
 			return false;
-		if (!pool->stratum_auth && !auth_stratum(pool))
-			return false;
-		return false;
+		if (!pool->stratum_auth) {
+			if (!auth_stratum(pool))
+				return false;
+			/* We create the stratum thread for each pool just
+			 * after successful authorisation */
+			init_stratum_thread(pool);
+			return true;
+		}
+		return true;
 	}
 
 	curl = curl_easy_init();
@@ -5473,7 +5523,9 @@ int main(int argc, char *argv[])
 	sigemptyset(&handler.sa_mask);
 	sigaction(SIGTERM, &handler, &termhandler);
 	sigaction(SIGINT, &handler, &inthandler);
-
+#ifndef WIN32
+	signal(SIGPIPE, SIG_IGN);
+#endif
 	opt_kernel_path = alloca(PATH_MAX);
 	strcpy(opt_kernel_path, CGMINER_PREFIX);
 	cgminer_path = alloca(PATH_MAX);
diff --git a/miner.h b/miner.h
index c099012..fc4ba8c 100644
--- a/miner.h
+++ b/miner.h
@@ -842,6 +842,7 @@ struct pool {
 	bool stratum_active;
 	bool stratum_auth;
 	struct stratum_work swork;
+	pthread_t stratum_thread;
 };
 
 #define GETWORK_MODE_TESTPOOL 'T'
diff --git a/util.c b/util.c
index 0aeadf6..aa5c709 100644
--- a/util.c
+++ b/util.c
@@ -897,7 +897,7 @@ static bool sock_full(SOCKETTYPE sock, bool wait)
 
 /* Peeks at a socket to find the first end of line and then reads just that
  * from the socket and returns that as a malloced char */
-static char *recv_line(SOCKETTYPE sock)
+char *recv_line(SOCKETTYPE sock)
 {
 	char *sret = NULL, *s;
 	ssize_t len;
@@ -1048,7 +1048,7 @@ static bool parse_diff(struct pool *pool, json_t *val)
 	return true;
 }
 
-static bool parse_stratum(struct pool *pool, char *s)
+bool parse_stratum(struct pool *pool, char *s)
 {
 	json_t *val = NULL, *method, *err_val, *params;
 	json_error_t err;
diff --git a/util.h b/util.h
index 3962f03..4eb51a7 100644
--- a/util.h
+++ b/util.h
@@ -109,6 +109,8 @@
 	#endif
 #endif
 struct pool;
+char *recv_line(SOCKETTYPE sock);
+bool parse_stratum(struct pool *pool, char *s);
 bool extract_sockaddr(struct pool *pool, char *url);
 bool auth_stratum(struct pool *pool);
 bool initiate_stratum(struct pool *pool);