Commit 6305146bea194c6f9f01ab49352e2dd47ac45986

Con Kolivas 2011-07-20T13:07:41

Implement proper flagging of idle pools, test them with the watchdog thread, and failover correctly.

diff --git a/main.c b/main.c
index ec069b7..c365e17 100644
--- a/main.c
+++ b/main.c
@@ -235,9 +235,8 @@ static void add_pool(void)
 		applog(LOG_ERR, "Failed to pthread_mutex_init in add_pool");
 		exit (1);
 	}
-	/* Make sure the pool doesn't think we've been idle since time 0 if
-	 * we rush to !localgen */
-	pool->tv_localgen.tv_sec = ~0UL;
+	/* Make sure the pool doesn't think we've been idle since time 0 */
+	pool->tv_idle.tv_sec = ~0UL;
 }
 
 /* Pool variant of test and set */
@@ -273,23 +272,6 @@ static struct pool *current_pool(void)
 	return pool;
 }
 
-static void switch_pools(void)
-{
-	pthread_mutex_lock(&control_lock);
-	pool_no++;
-	if (pool_no >= total_pools)
-		pool_no = 0;
-	currentpool = &pools[pool_no];
-	gettimeofday(&currentpool->tv_localgen, NULL);
-	applog(LOG_WARNING, "Prolonged outage. Attempting to switch to %s", currentpool->rpc_url);
-	pthread_mutex_unlock(&control_lock);
-
-	/* Reset the queued amount to allow more to be queued for the new pool */
-	pthread_mutex_lock(&qd_lock);
-	total_queued = 0;
-	pthread_mutex_unlock(&qd_lock);
-}
-
 /* FIXME: Use asprintf for better errors. */
 static char *set_algo(const char *arg, enum sha256_algos *algo)
 {
@@ -843,11 +825,11 @@ static bool submit_upstream_work(const struct work *work)
 		if (!pool_tset(pool, &pool->submit_fail)) {
 			total_ro++;
 			pool->remotefail_occasions++;
-			applog(LOG_WARNING, "Upstream communication failure, caching submissions");
+			applog(LOG_WARNING, "Pool %d communication failure, caching submissions", pool->pool_no);
 		}
 		goto out;
 	} else if (pool_tclear(pool, &pool->submit_fail))
-		applog(LOG_WARNING, "Upstream communication resumed, submitting work");
+		applog(LOG_WARNING, "Pool %d communication resumed, submitting work", pool->pool_no);
 
 	res = json_object_get(val, "result");
 
@@ -1144,11 +1126,9 @@ static void inc_staged(struct pool *pool, int inc, bool lp)
 	if (lp) {
 		lp_staged += inc;
 		total_staged += inc;
-		pool->idle = true;
-	} else if (lp_staged) {
-		if (!--lp_staged)
-			pool->idle = false;
-	} else
+	} else if (lp_staged)
+		--lp_staged;
+	else
 		total_staged += inc;
 	pthread_mutex_unlock(&stgd_lock);
 }
@@ -1180,6 +1160,61 @@ static int real_staged(void)
 	return ret;
 }
 
+static void switch_pools(void)
+{
+	struct pool *pool, *last_pool;
+	int i, pools_active = 0;
+
+	for (i = 0; i < total_pools; i++) {
+		pool = &pools[i];
+
+		if (!pool->idle)
+			pools_active++;
+	}
+	
+	if (!pools_active) {
+		applog(LOG_ERR, "No pools active, waiting...");
+		goto out;
+	}
+
+	pthread_mutex_lock(&control_lock);
+	last_pool = currentpool;
+	switch (pool_strategy) {
+		/* Both of these set to the master pool */
+		case POOL_FAILOVER:
+		case POOL_LOADBALANCE:
+			for (i = 0; i < total_pools; i++) {
+				if (!pools[i].idle) {
+					pool_no = i;
+					break;
+				}
+			}
+			break;
+		/* Both of these simply increment and cycle */
+		case POOL_ROUNDROBIN:
+		case POOL_ROTATE:
+			pool_no++;
+			if (pool_no >= total_pools)
+				pool_no = 0;
+			break;
+		default:
+			break;
+	}
+	currentpool = &pools[pool_no];
+	pool = currentpool;
+	pthread_mutex_unlock(&control_lock);
+
+	if (pool != last_pool)
+		applog(LOG_WARNING, "Switching to %s", pool->rpc_url);
+
+	/* Reset the queued amount to allow more to be queued for the new pool */
+	pthread_mutex_lock(&qd_lock);
+	total_queued = 0;
+	pthread_mutex_unlock(&qd_lock);
+out:
+	inc_staged(pool, 1, true);
+}
+
 static void set_curblock(char *hexstr)
 {
 	struct timeval tv_now;
@@ -1430,6 +1465,7 @@ static bool pool_active(struct pool *pool)
 			pool->getwork_requested++;
 			inc_queued();
 			ret = true;
+			gettimeofday(&pool->tv_idle, NULL);
 		} else {
 			applog(LOG_DEBUG, "Successfully retreived but FAILED to decipher work from pool %u %s",
 			       pool->pool_no, pool->rpc_url);
@@ -1444,6 +1480,20 @@ out:
 	return ret;
 }
 
+static void pool_died(struct pool *pool)
+{
+	applog(LOG_WARNING, "Pool %d %s not responding!", pool->pool_no, pool->rpc_url);
+	gettimeofday(&pool->tv_idle, NULL);
+	switch_pools();
+}
+
+static void pool_resus(struct pool *pool)
+{
+	applog(LOG_WARNING, "Pool %d %s recovered", pool->pool_no, pool->rpc_url);
+	if (pool->pool_no < pool_no && pool_strategy == POOL_FAILOVER)
+		switch_pools();
+}
+
 static bool queue_request(void)
 {
 	int maxq = opt_queue + mining_threads;
@@ -1552,29 +1602,20 @@ retry:
 		uint32_t ntime;
 
 		/* Only print this message once each time we shift to localgen */
-		if (!pool_tset(pool, &pool->localgen)) {
+		if (!pool_tset(pool, &pool->idle)) {
 			applog(LOG_WARNING, "Server not providing work fast enough, generating work locally");
 			pool->localgen_occasions++;
 			total_lo++;
-			gettimeofday(&pool->tv_localgen, NULL);
+			gettimeofday(&pool->tv_idle, NULL);
 		} else {
 			struct timeval tv_now, diff;
 
 			gettimeofday(&tv_now, NULL);
-			timeval_subtract(&diff, &tv_now, &pool->tv_localgen);
-			if (total_pools > 1) {
-				/* Attempt to switch pools if this one has been unresponsive for >half
-				 * a block's duration */
-				if (diff.tv_sec > 300) {
-					switch_pools();
-					inc_staged(pool, 1, true);
-					goto retry;
-				}
-			} else if (diff.tv_sec > 600) {
-				/* A new block appears on average every 10 mins */
-				applog(LOG_WARNING, "Prolonged outage. Going idle till network recovers.");
-				/* Force every thread to wait for new work */
-				inc_staged(pool, 1, true);
+			timeval_subtract(&diff, &tv_now, &pool->tv_idle);
+			/* Attempt to switch pools if this one has been unresponsive for >half
+				* a block's duration */
+			if (diff.tv_sec > 300) {
+				pool_died(pool);
 				goto retry;
 			}
 		}
@@ -1594,23 +1635,18 @@ retry:
 	/* wait for 1st response, or get cached response */
 	work_heap = tq_pop(getq, &abstime);
 	if (unlikely(!work_heap)) {
-		if (total_pools > 1) {
-			/* Attempt to switch pools if this one has mandatory
-			 * work that has timed out or does not support rolltime */
-			pool->localgen_occasions++;
-			total_lo++;
-			switch_pools();
-			inc_staged(pool, 1, true);
-			goto retry;
-		}
-		if (!pool_tset(pool, &pool->localgen))
-			applog(LOG_WARNING, "Timed out waiting for work from server");
+		/* Attempt to switch pools if this one has mandatory work that
+		 * has timed out or does not support rolltime */
+		pool->localgen_occasions++;
+		total_lo++;
+		pool_died(pool);
 		goto retry;
 	}
 
+	pool = work_heap->pool;
 	/* If we make it here we have succeeded in getting fresh work */
-	if (pool_tclear(pool, &pool->localgen))
-		applog(LOG_WARNING, "Resuming with work from server");
+	if (pool_tclear(pool, &pool->idle))
+		pool_resus(pool);
 	dec_queued();
 
 	memcpy(work, work_heap, sizeof(*work));
@@ -2366,6 +2402,18 @@ static void *watchdog_thread(void *userdata)
 		}
 
 		gettimeofday(&now, NULL);
+
+		for (i = 0; i < total_pools; i++) {
+			struct pool *pool = &pools[i];
+
+			/* Test pool is idle once every minute */
+			if (pool->idle && now.tv_sec - pool->tv_idle.tv_sec > 60) {
+				gettimeofday(&pool->tv_idle, NULL);
+				if (pool_active(pool) && pool_tclear(pool, &pool->idle))
+					pool_resus(pool);
+			}
+		}
+
 		//for (i = 0; i < mining_threads; i++) {
 		for (i = 0; i < gpu_threads; i++) {
 			struct thr_info *thr = &thr_info[i];
@@ -2459,7 +2507,7 @@ static void print_summary(void)
 
 int main (int argc, char *argv[])
 {
-	unsigned int i, j = 0, x, y;
+	unsigned int i, j = 0, x, y, pools_active = 0;
 	struct sigaction handler;
 	struct thr_info *thr;
 	char name[256];
@@ -2688,14 +2736,20 @@ int main (int argc, char *argv[])
 		struct pool *pool;
 
 		pool = &pools[i];
-		if (pool_active(pool))
+		if (pool_active(pool)) {
 			applog(LOG_INFO, "Pool %d %s active", pool->pool_no, pool->rpc_url);
-		else {
+			pools_active++;
+		} else {
 			applog(LOG_WARNING, "Unable to get work from pool %d %s", pool->pool_no, pool->rpc_url);
 			pool->idle = true;
 		}
 	}
 
+	if (!pools_active) {
+		applog(LOG_ERR, "No pools active! Exiting.");
+		return 0;
+	}
+
 #ifdef HAVE_OPENCL
 	i = 0;
 
diff --git a/miner.h b/miner.h
index 089ccd5..4396f14 100644
--- a/miner.h
+++ b/miner.h
@@ -269,7 +269,6 @@ struct pool {
 	int pool_no;
 	int accepted, rejected;
 	bool submit_fail;
-	bool localgen;
 	bool idle;
 	bool has_rolltime;
 	bool probed;
@@ -278,7 +277,7 @@ struct pool {
 	unsigned int discarded_work;
 	unsigned int localgen_occasions;
 	unsigned int remotefail_occasions;
-	struct timeval tv_localgen;
+	struct timeval tv_idle;
 
 	char *rpc_url;
 	char *rpc_userpass;