Commit 5ad942d187c02cef0cf7a435f9e95b943799d63a

Con Kolivas 2012-04-29T00:56:23

Faster hardware can easily outstrip the speed we can get work and submit shares when using only one connection per pool. Test the queued list to see if any get/submits are already queued and if they are, start recruiting extra connections by generating new threads. This allows us to reuse network connections at low loads but recuit new open connections as they're needed, so that cgminer can scale to hardware of any size.

diff --git a/cgminer.c b/cgminer.c
index ef13b0d..af53532 100644
--- a/cgminer.c
+++ b/cgminer.c
@@ -1585,7 +1585,7 @@ bool regeneratehash(const struct work *work)
 		return false;
 }
 
-static bool submit_upstream_work(const struct work *work)
+static bool submit_upstream_work(const struct work *work, CURL *curl)
 {
 	char *hexstr = NULL;
 	json_t *val, *res;
@@ -1594,7 +1594,6 @@ static bool submit_upstream_work(const struct work *work)
 	int thr_id = work->thr_id;
 	struct cgpu_info *cgpu = thr_info[thr_id].cgpu;
 	struct pool *pool = work->pool;
-	CURL *curl = pool->submit_curl;
 	bool rolltime;
 	uint32_t *hash32;
 	char hashshow[64+1] = "";
@@ -1770,10 +1769,9 @@ static void get_benchmark_work(struct work *work)
 	memcpy(work, &bench_block, min_size);
 }
 
-static bool get_upstream_work(struct work *work)
+static bool get_upstream_work(struct work *work, CURL *curl)
 {
 	struct pool *pool = work->pool;
-	CURL *curl = pool->getwork_curl;
 	json_t *val = NULL;
 	bool rc = false;
 	int retries = 0;
@@ -1996,7 +1994,7 @@ static void *get_work_thread(void *userdata)
 		ret_work->pool = pool;
 
 		/* obtain new work from bitcoin via JSON-RPC */
-		while (!get_upstream_work(ret_work)) {
+		while (!get_upstream_work(ret_work, pool->getwork_curl)) {
 			if (unlikely((opt_retries >= 0) && (++failures > opt_retries))) {
 				applog(LOG_ERR, "json_rpc_call failed, terminating workio thread");
 				free_work(ret_work);
@@ -2026,11 +2024,72 @@ static void *get_work_thread(void *userdata)
 	return NULL;
 }
 
+static void *get_extra_work(void *userdata)
+{
+	struct workio_cmd *wc = (struct workio_cmd *)userdata;
+	struct work *ret_work = make_work();;
+	CURL *curl = curl_easy_init();
+	int failures = 0;
+
+	pthread_detach(pthread_self());
+
+	applog(LOG_DEBUG, "Creating extra get work thread");
+
+	if (wc->thr)
+		ret_work->thr = wc->thr;
+	else
+		ret_work->thr = NULL;
+
+	ret_work->pool = select_pool(wc->lagging);
+
+	/* obtain new work from bitcoin via JSON-RPC */
+	while (!get_upstream_work(ret_work, curl)) {
+		if (unlikely((opt_retries >= 0) && (++failures > opt_retries))) {
+			applog(LOG_ERR, "json_rpc_call failed, terminating workio thread");
+			free_work(ret_work);
+			kill_work();
+			goto out;
+		}
+
+		/* pause, then restart work-request loop */
+		applog(LOG_DEBUG, "json_rpc_call failed on get work, retry after %d seconds",
+			fail_pause);
+		sleep(fail_pause);
+		fail_pause += opt_fail_pause;
+	}
+	fail_pause = opt_fail_pause;
+
+	applog(LOG_DEBUG, "Pushing work to requesting thread");
+
+	/* send work to requesting thread */
+	if (unlikely(!tq_push(thr_info[stage_thr_id].q, ret_work))) {
+		applog(LOG_ERR, "Failed to tq_push work in workio_get_work");
+		kill_work();
+		free_work(ret_work);
+	}
+
+out:
+	workio_cmd_free(wc);
+	curl_easy_cleanup(curl);
+	return NULL;
+}
+
+/* As per the submit work system, we try to reuse the existing curl handles,
+ * but start recruiting extra connections if we start accumulating queued
+ * requests */
 static bool workio_get_work(struct workio_cmd *wc)
 {
 	struct pool *pool = select_pool(wc->lagging);
+	pthread_t get_thread;
 
-	return tq_push(pool->getwork_q, wc);
+	if (list_empty(&pool->getwork_q->q))
+		return tq_push(pool->getwork_q, wc);
+
+	if (unlikely(pthread_create(&get_thread, NULL, get_extra_work, (void *)wc))) {
+		applog(LOG_ERR, "Failed to create get_work_thread");
+		return false;
+	}
+	return true;
 }
 
 static bool stale_work(struct work *work, bool share)
@@ -2100,7 +2159,7 @@ static void *submit_work_thread(void *userdata)
 		}
 
 		/* submit solution to bitcoin via JSON-RPC */
-		while (!submit_upstream_work(work)) {
+		while (!submit_upstream_work(work, pool->submit_curl)) {
 			if (!opt_submit_stale && stale_work(work, true) && !pool->submit_old) {
 				applog(LOG_NOTICE, "Stale share detected on submit retry, discarding");
 				total_stale++;
@@ -2126,9 +2185,75 @@ static void *submit_work_thread(void *userdata)
 	return NULL;
 }
 
+static void *submit_extra_work(void *userdata)
+{
+	struct workio_cmd *wc = (struct workio_cmd *)userdata;
+	struct work *work = wc->u.work;
+	struct pool *pool = work->pool;
+	CURL *curl = curl_easy_init();
+	int failures = 0;
+
+	pthread_detach(pthread_self());
+
+	applog(LOG_DEBUG, "Creating extra submit work thread");
+
+	if (stale_work(work, true)) {
+		if (opt_submit_stale)
+			applog(LOG_NOTICE, "Stale share detected, submitting as user requested");
+		else if (pool->submit_old)
+			applog(LOG_NOTICE, "Stale share detected, submitting as pool requested");
+		else {
+			applog(LOG_NOTICE, "Stale share detected, discarding");
+			sharelog("discard", work);
+			total_stale++;
+			pool->stale_shares++;
+			goto out;
+		}
+	}
+
+	/* submit solution to bitcoin via JSON-RPC */
+	while (!submit_upstream_work(work, curl)) {
+		if (!opt_submit_stale && stale_work(work, true)) {
+			applog(LOG_NOTICE, "Stale share detected, discarding");
+			total_stale++;
+			pool->stale_shares++;
+			break;
+		}
+		if (unlikely((opt_retries >= 0) && (++failures > opt_retries))) {
+			applog(LOG_ERR, "Failed %d retries ...terminating workio thread", opt_retries);
+			kill_work();
+			break;
+		}
+
+		/* pause, then restart work-request loop */
+		applog(LOG_INFO, "json_rpc_call failed on submit_work, retry after %d seconds",
+			fail_pause);
+		sleep(fail_pause);
+		fail_pause += opt_fail_pause;
+	}
+	fail_pause = opt_fail_pause;
+out:
+	workio_cmd_free(wc);
+	curl_easy_cleanup(curl);
+	return NULL;
+}
+
+/* We try to reuse curl handles as much as possible, but if there is already
+ * work queued to be submitted, we start generating extra handles to submit
+ * the shares to avoid ever increasing backlogs. This allows us to scale to
+ * any size hardware */
 static bool workio_submit_work(struct workio_cmd *wc)
 {
-	return tq_push(wc->u.work->pool->submit_q, wc);
+	pthread_t submit_thread;
+
+	if (list_empty(&wc->u.work->pool->submit_q->q))
+		return tq_push(wc->u.work->pool->submit_q, wc);
+
+	if (unlikely(pthread_create(&submit_thread, NULL, submit_extra_work, (void *)wc))) {
+		applog(LOG_ERR, "Failed to create submit_work_thread");
+		return false;
+	}
+	return true;
 }
 
 /* Find the pool that currently has the highest priority */