Commit 7d288eac9fa1e541343e957ca86c2c52ae316f28

Con Kolivas 2012-05-01T19:59:37

Implement a scaleable networking framework designed to cope with any sized network requirements, yet minimise the number of connections being reoped. Do this by create a ring buffer linked list of curl handles to be used by getwork, recruiting extra handles when none is immediately available.

diff --git a/cgminer.c b/cgminer.c
index ad8ad96..ef0187c 100644
--- a/cgminer.c
+++ b/cgminer.c
@@ -379,7 +379,6 @@ static void sharelog(const char*disposition, const struct work*work)
 }
 
 static void *submit_work_thread(void *userdata);
-static void *get_work_thread(void *userdata);
 
 static void add_pool(void)
 {
@@ -392,13 +391,13 @@ static void add_pool(void)
 	pools[total_pools++] = pool;
 	if (unlikely(pthread_mutex_init(&pool->pool_lock, NULL)))
 		quit(1, "Failed to pthread_mutex_init in add_pool");
+	INIT_LIST_HEAD(&pool->curlring);
+
 	/* Make sure the pool doesn't think we've been idle since time 0 */
 	pool->tv_idle.tv_sec = ~0UL;
 
 	if (unlikely(pthread_create(&pool->submit_thread, NULL, submit_work_thread, (void *)pool)))
 		quit(1, "Failed to create pool submit thread");
-	if (unlikely(pthread_create(&pool->getwork_thread, NULL, get_work_thread, (void *)pool)))
-		quit(1, "Failed to create pool getwork thread");
 }
 
 /* Pool variant of test and set */
@@ -1957,77 +1956,46 @@ static void sighandler(int __maybe_unused sig)
 	kill_work();
 }
 
-/* One get work thread is created per pool, so as to use one curl handle for
- * all getwork reqeusts from the same pool, minimising connections opened, but
- * separate from the submit work curl handle to not delay share submissions due
- * to getwork traffic */
-static void *get_work_thread(void *userdata)
+/* Called with pool_lock held */
+static void recruit_curl(struct pool *pool)
 {
-	struct pool *pool = (struct pool *)userdata;
-	struct workio_cmd *wc;
-	CURL *curl;
-
-	pthread_detach(pthread_self());
-
-	/* getwork_q memory never freed */
-	pool->getwork_q = tq_new();
-	if (!pool->getwork_q)
-		quit(1, "Failed to tq_new in get_work_thread");
-
-	curl = curl_easy_init();
-	if (unlikely(!curl))
-		quit(1, "Failed to initialise pool getwork CURL");
+	struct curl_ent *ce = calloc(sizeof(struct curl_ent), 1);
 
-	while ((wc = tq_pop(pool->getwork_q, NULL)) != NULL) {
-		struct work *ret_work;
-		int failures = 0;
+	ce->curl = curl_easy_init();
+	if (unlikely(!ce->curl || !ce))
+		quit(1, "Failed to init in recruit_curl");
 
-		ret_work = make_work();
-
-		if (wc->thr)
-			ret_work->thr = wc->thr;
-		else
-			ret_work->thr = NULL;
-
-		ret_work->pool = pool;
-
-		/* obtain new work from bitcoin via JSON-RPC */
-		while (!get_upstream_work(ret_work, curl)) {
-			if (unlikely((opt_retries >= 0) && (++failures > opt_retries))) {
-				applog(LOG_ERR, "json_rpc_call failed, terminating workio thread");
-				free_work(ret_work);
-				kill_work();
-				break;
-			}
+	list_add(&ce->node, &pool->curlring);
+	applog(LOG_DEBUG, "Recruited new curl for pool %d", pool->pool_no);
+}
 
-			/* pause, then restart work-request loop */
-			applog(LOG_DEBUG, "json_rpc_call failed on get work, retry after %d seconds",
-				fail_pause);
-			sleep(fail_pause);
-			fail_pause += opt_fail_pause;
-		}
-		fail_pause = opt_fail_pause;
+static struct curl_ent *pop_curl_entry(struct pool *pool)
+{
+	struct curl_ent *ce;
 
-		applog(LOG_DEBUG, "Pushing work to requesting thread");
+	mutex_lock(&pool->pool_lock);
+	if (list_empty(&pool->curlring))
+		recruit_curl(pool);
+	ce = list_entry(pool->curlring.next, struct curl_ent, node);
+	list_del(&ce->node);
+	mutex_unlock(&pool->pool_lock);;
 
-		/* send work to requesting thread */
-		if (unlikely(!tq_push(thr_info[stage_thr_id].q, ret_work))) {
-			applog(LOG_ERR, "Failed to tq_push work in workio_get_work");
-			kill_work();
-			free_work(ret_work);
-		}
-		workio_cmd_free(wc);
-	}
+	return ce;
+}
 
-	curl_easy_cleanup(curl);
-	return NULL;
+static void push_curl_entry(struct curl_ent *ce, struct pool *pool)
+{
+	mutex_lock(&pool->pool_lock);
+	list_add(&ce->node, &pool->curlring);
+	mutex_unlock(&pool->pool_lock);
 }
 
-static void *get_extra_work(void *userdata)
+static void *get_work_thread(void *userdata)
 {
 	struct workio_cmd *wc = (struct workio_cmd *)userdata;
-	struct work *ret_work = make_work();;
-	CURL *curl = curl_easy_init();
+	struct work *ret_work = make_work();
+	struct curl_ent *ce;
+	struct pool *pool;
 	int failures = 0;
 
 	pthread_detach(pthread_self());
@@ -2039,10 +2007,11 @@ static void *get_extra_work(void *userdata)
 	else
 		ret_work->thr = NULL;
 
-	ret_work->pool = select_pool(wc->lagging);
+	pool = ret_work->pool = select_pool(wc->lagging);
+	ce = pop_curl_entry(pool);
 
 	/* obtain new work from bitcoin via JSON-RPC */
-	while (!get_upstream_work(ret_work, curl)) {
+	while (!get_upstream_work(ret_work, ce->curl)) {
 		if (unlikely((opt_retries >= 0) && (++failures > opt_retries))) {
 			applog(LOG_ERR, "json_rpc_call failed, terminating workio thread");
 			free_work(ret_work);
@@ -2069,7 +2038,7 @@ static void *get_extra_work(void *userdata)
 
 out:
 	workio_cmd_free(wc);
-	curl_easy_cleanup(curl);
+	push_curl_entry(ce, pool);
 	return NULL;
 }
 
@@ -2078,13 +2047,9 @@ out:
  * requests */
 static bool workio_get_work(struct workio_cmd *wc)
 {
-	struct pool *pool = select_pool(wc->lagging);
 	pthread_t get_thread;
 
-	if (list_empty(&pool->getwork_q->q) || pool->submit_fail)
-		return tq_push(pool->getwork_q, wc);
-
-	if (unlikely(pthread_create(&get_thread, NULL, get_extra_work, (void *)wc))) {
+	if (unlikely(pthread_create(&get_thread, NULL, get_work_thread, (void *)wc))) {
 		applog(LOG_ERR, "Failed to create get_work_thread");
 		return false;
 	}
@@ -4450,6 +4415,8 @@ int add_pool_details(bool live, char *url, char *user, char *pass)
 	pool->prio = total_pools;
 	if (unlikely(pthread_mutex_init(&pool->pool_lock, NULL)))
 		quit (1, "Failed to pthread_mutex_init in input_pool");
+	INIT_LIST_HEAD(&pool->curlring);
+
 	pool->rpc_url = url;
 	pool->rpc_user = user;
 	pool->rpc_pass = pass;
@@ -4462,8 +4429,6 @@ int add_pool_details(bool live, char *url, char *user, char *pass)
 
 	if (unlikely(pthread_create(&pool->submit_thread, NULL, submit_work_thread, (void *)pool)))
 		quit(1, "Failed to create pool submit thread");
-	if (unlikely(pthread_create(&pool->getwork_thread, NULL, get_work_thread, (void *)pool)))
-		quit(1, "Failed to create pool getwork thread");
 
 	/* Test the pool is not idle if we're live running, otherwise
 	 * it will be tested separately */
diff --git a/miner.h b/miner.h
index 6c750be..3f6f9ba 100644
--- a/miner.h
+++ b/miner.h
@@ -598,6 +598,11 @@ typedef struct {
 } dev_blk_ctx;
 #endif
 
+struct curl_ent {
+	CURL *curl;
+	struct list_head node;
+};
+
 struct pool {
 	int pool_no;
 	int prio;
@@ -634,6 +639,8 @@ struct pool {
 	pthread_t longpoll_thread;
 	pthread_t submit_thread;
 	pthread_t getwork_thread;
+
+	struct list_head curlring;
 };
 
 struct work {