Commit 3a2008ac61dc8b62e121aee71c66b235e0b34756

Con Kolivas 2013-05-19T11:25:45

Use a persistent single separate thread for stratum share submission that uses workqueues since all stratum sends are serialised.

diff --git a/cgminer.c b/cgminer.c
index 3037e6d..1b04fdd 100644
--- a/cgminer.c
+++ b/cgminer.c
@@ -3220,105 +3220,6 @@ static void *submit_work_thread(void *userdata)
 
 	applog(LOG_DEBUG, "Creating extra submit work thread");
 
-	if (stale_work(work, true)) {
-		if (opt_submit_stale)
-			applog(LOG_NOTICE, "Pool %d stale share detected, submitting as user requested", pool->pool_no);
-		else if (pool->submit_old)
-			applog(LOG_NOTICE, "Pool %d stale share detected, submitting as pool requested", pool->pool_no);
-		else {
-			applog(LOG_NOTICE, "Pool %d stale share detected, discarding", pool->pool_no);
-			sharelog("discard", work);
-
-			mutex_lock(&stats_lock);
-			total_stale++;
-			pool->stale_shares++;
-			total_diff_stale += work->work_difficulty;
-			pool->diff_stale += work->work_difficulty;
-			mutex_unlock(&stats_lock);
-
-			goto out;
-		}
-		work->stale = true;
-	}
-
-	if (work->stratum) {
-		struct stratum_share *sshare = calloc(sizeof(struct stratum_share), 1);
-		uint32_t *hash32 = (uint32_t *)work->hash, nonce;
-		bool submitted = false;
-		char *noncehex;
-		char s[1024];
-
-		sshare->sshare_time = time(NULL);
-		/* This work item is freed in parse_stratum_response */
-		sshare->work = work;
-		nonce = *((uint32_t *)(work->data + 76));
-		noncehex = bin2hex((const unsigned char *)&nonce, 4);
-		memset(s, 0, 1024);
-
-		mutex_lock(&sshare_lock);
-		/* Give the stratum share a unique id */
-		sshare->id = swork_id++;
-		mutex_unlock(&sshare_lock);
-
-		sprintf(s, "{\"params\": [\"%s\", \"%s\", \"%s\", \"%s\", \"%s\"], \"id\": %d, \"method\": \"mining.submit\"}",
-			pool->rpc_user, work->job_id, work->nonce2, work->ntime, noncehex, sshare->id);
-		free(noncehex);
-
-		applog(LOG_INFO, "Submitting share %08lx to pool %d",
-					(long unsigned int)htole32(hash32[6]), pool->pool_no);
-
-		/* Try resubmitting for up to 2 minutes if we fail to submit
-		 * once and the stratum pool nonce1 still matches suggesting
-		 * we may be able to resume. */
-		while (time(NULL) < sshare->sshare_time + 120) {
-			bool sessionid_match;
-
-			if (likely(stratum_send(pool, s, strlen(s)))) {
-				if (pool_tclear(pool, &pool->submit_fail))
-						applog(LOG_WARNING, "Pool %d communication resumed, submitting work", pool->pool_no);
-
-				mutex_lock(&sshare_lock);
-				HASH_ADD_INT(stratum_shares, id, sshare);
-				pool->sshares++;
-				mutex_unlock(&sshare_lock);
-
-				applog(LOG_DEBUG, "Successfully submitted, adding to stratum_shares db");
-				submitted = true;
-				break;
-			}
-			if (!pool_tset(pool, &pool->submit_fail) && cnx_needed(pool)) {
-				applog(LOG_WARNING, "Pool %d stratum share submission failure", pool->pool_no);
-				total_ro++;
-				pool->remotefail_occasions++;
-			}
-
-			if (opt_lowmem) {
-				applog(LOG_DEBUG, "Lowmem option prevents resubmitting stratum share");
-				break;
-			}
-
-			cg_rlock(&pool->data_lock);
-			sessionid_match = (pool->nonce1 && !strcmp(work->nonce1, pool->nonce1));
-			cg_runlock(&pool->data_lock);
-
-			if (!sessionid_match) {
-				applog(LOG_DEBUG, "No matching session id for resubmitting stratum share");
-				break;
-			}
-			/* Retry every 5 seconds */
-			sleep(5);
-		}
-
-		if (unlikely(!submitted)) {
-			applog(LOG_DEBUG, "Failed to submit stratum share, discarding");
-			free_work(work);
-			free(sshare);
-			pool->stale_shares++;
-			total_stale++;
-		}
-		goto out;
-	}
-
 	ce = pop_curl_entry(pool);
 	/* submit solution to bitcoin via JSON-RPC */
 	while (!submit_upstream_work(work, ce->curl, resubmit)) {
@@ -3337,6 +3238,7 @@ static void *submit_work_thread(void *userdata)
 			pool->diff_stale += work->work_difficulty;
 			mutex_unlock(&stats_lock);
 
+			free_work(work);
 			break;
 		}
 
@@ -3344,7 +3246,7 @@ static void *submit_work_thread(void *userdata)
 		applog(LOG_INFO, "json_rpc_call failed on submit_work, retrying");
 	}
 	push_curl_entry(ce, pool);
-out:
+
 	return NULL;
 }
 
@@ -4909,18 +4811,18 @@ static bool supports_resume(struct pool *pool)
 	return ret;
 }
 
-/* One stratum thread per pool that has stratum waits on the socket checking
- * for new messages and for the integrity of the socket connection. We reset
- * the connection based on the integrity of the receive side only as the send
- * side will eventually expire data it fails to send. */
-static void *stratum_thread(void *userdata)
+/* One stratum receive thread per pool that has stratum waits on the socket
+ * checking for new messages and for the integrity of the socket connection. We
+ * reset the connection based on the integrity of the receive side only as the
+ * send side will eventually expire data it fails to send. */
+static void *stratum_rthread(void *userdata)
 {
 	struct pool *pool = (struct pool *)userdata;
 	char threadname[16];
 
 	pthread_detach(pthread_self());
 
-	snprintf(threadname, 16, "stratum/%d", pool->pool_no);
+	snprintf(threadname, 16, "StratumR/%d", pool->pool_no);
 	RenameThread(threadname);
 
 	while (42) {
@@ -5023,10 +4925,125 @@ out:
 	return NULL;
 }
 
-static void init_stratum_thread(struct pool *pool)
+/* Each pool has one stratum send thread for sending shares to avoid many
+ * threads being created for submission since all sends need to be serialised
+ * anyway. */
+static void *stratum_sthread(void *userdata)
 {
-	if (unlikely(pthread_create(&pool->stratum_thread, NULL, stratum_thread, (void *)pool)))
-		quit(1, "Failed to create stratum thread");
+	struct pool *pool = (struct pool *)userdata;
+	char threadname[16];
+
+	pthread_detach(pthread_self());
+
+	snprintf(threadname, 16, "StratumS/%d", pool->pool_no);
+	RenameThread(threadname);
+
+	pool->stratum_q = tq_new();
+	if (!pool->stratum_q)
+		quit(1, "Failed to create stratum_q in stratum_sthread");
+
+	while (42) {
+		struct stratum_share *sshare;
+		uint32_t *hash32, nonce;
+		struct work *work;
+		bool submitted;
+		char *noncehex;
+		char s[1024];
+
+		if (unlikely(pool->removed))
+			break;
+
+		work = tq_pop(pool->stratum_q, NULL);
+		if (unlikely(!work))
+			quit(1, "Stratum q returned empty work");
+
+		sshare = calloc(sizeof(struct stratum_share), 1);
+		hash32 = (uint32_t *)work->hash;
+		submitted = false;
+
+		sshare->sshare_time = time(NULL);
+		/* This work item is freed in parse_stratum_response */
+		sshare->work = work;
+		nonce = *((uint32_t *)(work->data + 76));
+		noncehex = bin2hex((const unsigned char *)&nonce, 4);
+		memset(s, 0, 1024);
+
+		mutex_lock(&sshare_lock);
+		/* Give the stratum share a unique id */
+		sshare->id = swork_id++;
+		mutex_unlock(&sshare_lock);
+
+		sprintf(s, "{\"params\": [\"%s\", \"%s\", \"%s\", \"%s\", \"%s\"], \"id\": %d, \"method\": \"mining.submit\"}",
+			pool->rpc_user, work->job_id, work->nonce2, work->ntime, noncehex, sshare->id);
+		free(noncehex);
+
+		applog(LOG_INFO, "Submitting share %08lx to pool %d",
+					(long unsigned int)htole32(hash32[6]), pool->pool_no);
+
+		/* Try resubmitting for up to 2 minutes if we fail to submit
+		 * once and the stratum pool nonce1 still matches suggesting
+		 * we may be able to resume. */
+		while (time(NULL) < sshare->sshare_time + 120) {
+			bool sessionid_match;
+
+			if (likely(stratum_send(pool, s, strlen(s)))) {
+				if (pool_tclear(pool, &pool->submit_fail))
+						applog(LOG_WARNING, "Pool %d communication resumed, submitting work", pool->pool_no);
+
+				mutex_lock(&sshare_lock);
+				HASH_ADD_INT(stratum_shares, id, sshare);
+				pool->sshares++;
+				mutex_unlock(&sshare_lock);
+
+				applog(LOG_DEBUG, "Successfully submitted, adding to stratum_shares db");
+				submitted = true;
+				break;
+			}
+			if (!pool_tset(pool, &pool->submit_fail) && cnx_needed(pool)) {
+				applog(LOG_WARNING, "Pool %d stratum share submission failure", pool->pool_no);
+				total_ro++;
+				pool->remotefail_occasions++;
+			}
+
+			if (opt_lowmem) {
+				applog(LOG_DEBUG, "Lowmem option prevents resubmitting stratum share");
+				break;
+			}
+
+			cg_rlock(&pool->data_lock);
+			sessionid_match = (pool->nonce1 && !strcmp(work->nonce1, pool->nonce1));
+			cg_runlock(&pool->data_lock);
+
+			if (!sessionid_match) {
+				applog(LOG_DEBUG, "No matching session id for resubmitting stratum share");
+				break;
+			}
+			/* Retry every 5 seconds */
+			sleep(5);
+		}
+
+		if (unlikely(!submitted)) {
+			applog(LOG_DEBUG, "Failed to submit stratum share, discarding");
+			free_work(work);
+			free(sshare);
+			pool->stale_shares++;
+			total_stale++;
+		}
+	}
+
+	/* Freeze the work queue but don't free up its memory in case there is
+	 * work still trying to be submitted to the removed pool. */
+	tq_freeze(pool->stratum_q);
+
+	return NULL;
+}
+
+static void init_stratum_threads(struct pool *pool)
+{
+	if (unlikely(pthread_create(&pool->stratum_sthread, NULL, stratum_sthread, (void *)pool)))
+		quit(1, "Failed to create stratum sthread");
+	if (unlikely(pthread_create(&pool->stratum_rthread, NULL, stratum_rthread, (void *)pool)))
+		quit(1, "Failed to create stratum rthread");
 }
 
 static void *longpoll_thread(void *userdata);
@@ -5069,7 +5086,7 @@ retry_stratum:
 			bool ret = initiate_stratum(pool) && auth_stratum(pool);
 
 			if (ret)
-				init_stratum_thread(pool);
+				init_stratum_threads(pool);
 			else
 				pool_tclear(pool, &pool->stratum_init);
 			return ret;
@@ -5476,13 +5493,45 @@ static struct work *get_work(struct thr_info *thr, const int thr_id)
 void submit_work_async(struct work *work_in, struct timeval *tv_work_found)
 {
 	struct work *work = copy_work(work_in);
+	struct pool *pool = work->pool;
 	pthread_t submit_thread;
 
 	if (tv_work_found)
 		copy_time(&work->tv_work_found, tv_work_found);
-	applog(LOG_DEBUG, "Pushing submit work to work thread");
-	if (unlikely(pthread_create(&submit_thread, NULL, submit_work_thread, (void *)work)))
-		quit(1, "Failed to create submit_work_thread");
+
+	if (stale_work(work, true)) {
+		if (opt_submit_stale)
+			applog(LOG_NOTICE, "Pool %d stale share detected, submitting as user requested", pool->pool_no);
+		else if (pool->submit_old)
+			applog(LOG_NOTICE, "Pool %d stale share detected, submitting as pool requested", pool->pool_no);
+		else {
+			applog(LOG_NOTICE, "Pool %d stale share detected, discarding", pool->pool_no);
+			sharelog("discard", work);
+
+			mutex_lock(&stats_lock);
+			total_stale++;
+			pool->stale_shares++;
+			total_diff_stale += work->work_difficulty;
+			pool->diff_stale += work->work_difficulty;
+			mutex_unlock(&stats_lock);
+
+			free_work(work);
+			return;
+		}
+		work->stale = true;
+	}
+
+	if (work->stratum) {
+		applog(LOG_DEBUG, "Pushing pool %d work to stratum queue", pool->pool_no);
+		if (unlikely(!tq_push(pool->stratum_q, work))) {
+			applog(LOG_DEBUG, "Discarding work from removed pool");
+			free_work(work);
+		}
+	} else {
+		applog(LOG_DEBUG, "Pushing submit work to work thread");
+		if (unlikely(pthread_create(&submit_thread, NULL, submit_work_thread, (void *)work)))
+			quit(1, "Failed to create submit_work_thread");
+	}
 }
 
 void inc_hw_errors(struct thr_info *thr)
diff --git a/miner.h b/miner.h
index 471b372..2e6da64 100644
--- a/miner.h
+++ b/miner.h
@@ -1126,8 +1126,10 @@ struct pool {
 	bool stratum_init;
 	bool stratum_notify;
 	struct stratum_work swork;
-	pthread_t stratum_thread;
+	pthread_t stratum_sthread;
+	pthread_t stratum_rthread;
 	pthread_mutex_t stratum_lock;
+	struct thread_q *stratum_q;
 	int sshares; /* stratum shares submitted waiting on response */
 
 	/* GBT  variables */