Commit 93f4163aca9dd4fd1d803d4b993eca55da49e561

Con Kolivas 2011-08-24T17:32:36

Create a hash list of all the blocks created and search them to detect when a new block has definitely appeared, using that information to detect stale work and discard it.

diff --git a/main.c b/main.c
index 1a0a440..add64b8 100644
--- a/main.c
+++ b/main.c
@@ -270,6 +270,13 @@ static char *current_hash;
 static char datestamp[40];
 static char blocktime[30];
 
+struct block {
+	char hash[37];
+	UT_hash_handle hh;
+};
+
+static struct block *blocks = NULL;
+
 static char *opt_kernel = NULL;
 
 #if defined(unix)
@@ -1829,18 +1836,13 @@ static bool stale_work(struct work *work)
 	if ((now.tv_sec - work->tv_staged.tv_sec) >= opt_scantime)
 		return true;
 
-	/* Only use the primary pool for determination as the work may
-	 * interleave at times of new blocks */
-	if (work->pool != current_pool())
-		return ret;
-
-	hexstr = bin2hex(work->data, 36);
+	hexstr = bin2hex(work->data, 18);
 	if (unlikely(!hexstr)) {
 		applog(LOG_ERR, "submit_work_thread OOM");
 		return ret;
 	}
 
-	if (strncmp(hexstr, current_block, 36))
+	if (strcmp(hexstr, current_block))
 		ret = true;
 
 	free(hexstr);
@@ -2031,6 +2033,95 @@ static void switch_pools(struct pool *selected)
 	inc_staged(pool, 1, true);
 }
 
+static void discard_work(struct work *work)
+{
+	if (!work->clone && !work->rolls && !work->mined) {
+		if (work->pool)
+			work->pool->discarded_work++;
+		total_discarded++;
+		if (opt_debug)
+			applog(LOG_DEBUG, "Discarded work");
+	} else if (opt_debug)
+		applog(LOG_DEBUG, "Discarded cloned or rolled work");
+	free_work(work);
+}
+
+/* This is overkill, but at least we'll know accurately how much work is
+ * queued to prevent ever being left without work */
+static void inc_queued(void)
+{
+	mutex_lock(&qd_lock);
+	total_queued++;
+	mutex_unlock(&qd_lock);
+}
+
+static void dec_queued(void)
+{
+	mutex_lock(&qd_lock);
+	if (total_queued > 0)
+		total_queued--;
+	mutex_unlock(&qd_lock);
+	dec_staged(1);
+}
+
+static int requests_queued(void)
+{
+	int ret;
+
+	mutex_lock(&qd_lock);
+	ret = total_queued;
+	mutex_unlock(&qd_lock);
+	return ret;
+}
+
+static int discard_stale(void)
+{
+	struct work *work, *tmp;
+	int i, stale = 0;
+
+	mutex_lock(&getq->mutex);
+	HASH_ITER(hh, staged_work, work, tmp) {
+		if (stale_work(work)) {
+			HASH_DEL(staged_work, work);
+			discard_work(work);
+			stale++;
+		}
+	}
+	mutex_unlock(&getq->mutex);
+
+	if (opt_debug)
+		applog(LOG_DEBUG, "Discarded %d stales that didn't match current hash", stale);
+
+	/* Dec queued outside the loop to not have recursive locks */
+	for (i = 0; i < stale; i++)
+		dec_queued();
+
+	return stale;
+}
+
+static bool queue_request(struct thr_info *thr, bool needed);
+
+static void restart_threads(void)
+{
+	struct pool *pool = current_pool();
+	int i, stale;
+
+	block_changed = BLOCK_NONE;
+
+	/* Discard staged work that is now stale */
+	stale = discard_stale();
+
+	for (i = 0; i < stale; i++)
+		queue_request(NULL, true);
+
+	/* Temporarily increase the staged count so that the pool is not seen
+	 * as lagging when a new block hits */
+	inc_staged(pool, mining_threads, true);
+
+	for (i = 0; i < mining_threads; i++)
+		work_restart[i].restart = 1;
+}
+
 static void set_curblock(char *hexstr, unsigned char *hash)
 {
 	unsigned char hash_swap[32];
@@ -2041,7 +2132,7 @@ static void set_curblock(char *hexstr, unsigned char *hash)
 	 * we might be accessing its data elsewhere */
 	if (current_hash)
 		old_hash = current_hash;
-	memcpy(current_block, hexstr, 36);
+	strcpy(current_block, hexstr);
 	gettimeofday(&tv_now, NULL);
 	get_timestamp(blocktime, &tv_now);
 	swap256(hash_swap, hash);
@@ -2054,33 +2145,40 @@ static void set_curblock(char *hexstr, unsigned char *hash)
 
 static void test_work_current(struct work *work)
 {
+	struct block *s;
 	char *hexstr;
 
-	/* Only use the primary pool for determination */
-	if (work->pool != current_pool() || work->cloned || work->rolls || work->clone)
-		return;
-
-	hexstr = bin2hex(work->data, 36);
+	hexstr = bin2hex(work->data, 18);
 	if (unlikely(!hexstr)) {
 		applog(LOG_ERR, "stage_thread OOM");
 		return;
 	}
 
-	/* current_block is blanked out on successful longpoll */
-	if (unlikely(strncmp(hexstr, current_block, 36))) {
+	/* Search to see if this block exists yet and if not, consider it a
+	 * new block and set the current block details to this one */
+	mutex_lock(&getq->mutex);
+	HASH_FIND_STR(blocks, hexstr, s);
+	mutex_unlock(&getq->mutex);
+	if (!s) {
+		s = calloc(sizeof(struct block), 1);
+		if (unlikely(!s))
+			quit (1, "test_work_current OOM");
+		strcpy(s->hash, hexstr);
+		mutex_lock(&getq->mutex);
+		HASH_ADD_STR(blocks, hash, s);
+		mutex_unlock(&getq->mutex);
+		set_curblock(hexstr, work->data);
+
+		new_blocks++;
 		if (block_changed != BLOCK_LP && block_changed != BLOCK_FIRST) {
 			block_changed = BLOCK_DETECT;
-			new_blocks++;
 			if (have_longpoll)
 				applog(LOG_WARNING, "New block detected on network before longpoll, waiting on fresh work");
 			else
 				applog(LOG_WARNING, "New block detected on network, waiting on fresh work");
-			/* As we can't flush the work from here, signal the
-			 * wakeup thread to restart all the threads */
-			work_restart[watchdog_thr_id].restart = 1;
 		} else
 			block_changed = BLOCK_NONE;
-		set_curblock(hexstr, work->data);
+		restart_threads();
 	}
 
 	free(hexstr);
@@ -2804,34 +2902,6 @@ out_unlock:
 	}
 }
 
-/* This is overkill, but at least we'll know accurately how much work is
- * queued to prevent ever being left without work */
-static void inc_queued(void)
-{
-	mutex_lock(&qd_lock);
-	total_queued++;
-	mutex_unlock(&qd_lock);
-}
-
-static void dec_queued(void)
-{
-	mutex_lock(&qd_lock);
-	if (total_queued > 0)
-		total_queued--;
-	mutex_unlock(&qd_lock);
-	dec_staged(1);
-}
-
-static int requests_queued(void)
-{
-	int ret;
-
-	mutex_lock(&qd_lock);
-	ret = total_queued;
-	mutex_unlock(&qd_lock);
-	return ret;
-}
-
 static bool pool_active(struct pool *pool, bool pinging)
 {
 	bool ret = false;
@@ -2951,19 +3021,6 @@ out:
 	return true;
 }
 
-static void discard_work(struct work *work)
-{
-	if (!work->clone && !work->rolls && !work->mined) {
-		if (work->pool)
-			work->pool->discarded_work++;
-		total_discarded++;
-		if (opt_debug)
-			applog(LOG_DEBUG, "Discarded work");
-	} else if (opt_debug)
-		applog(LOG_DEBUG, "Discarded cloned or rolled work");
-	free_work(work);
-}
-
 struct work *hash_pop(const struct timespec *abstime)
 {
 	struct work *work = NULL;
@@ -2990,55 +3047,6 @@ out:
 	return work;
 }
 
-static void discard_staged(void)
-{
-	struct timespec abstime = {};
-	struct timeval now;
-	struct work *work_heap;
-
-	/* Just in case we fell in a hole and missed a queue filling */
-	if (unlikely(!requests_staged()))
-		return;
-
-	gettimeofday(&now, NULL);
-	abstime.tv_sec = now.tv_sec + 60;
-
-	if (opt_debug)
-		applog(LOG_DEBUG, "Popping work to discard staged");
-
-	work_heap = hash_pop(&abstime);
-	if (unlikely(!work_heap))
-		return;
-
-	discard_work(work_heap);
-	dec_queued();
-}
-
-static void flush_requests(void)
-{
-	struct pool *pool = current_pool();
-	int i, stale;
-
-	/* We should have one fresh work item staged from the block change. */
-	stale = requests_staged() - 1;
-
-	/* Temporarily increase the staged count so that get_work thinks there
-	 * is work available instead of making threads reuse existing work */
-	inc_staged(pool, mining_threads, true);
-
-	for (i = 0; i < stale; i++) {
-		/* Queue a whole batch of new requests */
-		if (unlikely(!queue_request(NULL, true))) {
-			applog(LOG_ERR, "Failed to queue requests in flush_requests");
-			kill_work();
-			break;
-		}
-		/* Pop off the old requests. Cancelling the requests would be better
-		* but is tricky */
-		discard_staged();
-	}
-}
-
 static inline bool should_roll(struct work *work)
 {
 	int rs;
@@ -3123,6 +3131,7 @@ retry:
 		if (requested && !pool_tset(pool, &pool->lagging)) {
 			applog(LOG_WARNING, "Pool %d not providing work fast enough",
 				pool->pool_no);
+			applog(LOG_WARNING, "staged %d", requests_staged());
 			pool->localgen_occasions++;
 			total_lo++;
 		}
@@ -3801,20 +3810,6 @@ out:
 }
 #endif /* HAVE_OPENCL */
 
-static void restart_threads(void)
-{
-	int i;
-
-	if (block_changed == BLOCK_DETECT)
-		block_changed = BLOCK_NONE;
-
-	/* Discard old queued requests and get new ones */
-	flush_requests();
-
-	for (i = 0; i < mining_threads; i++)
-		work_restart[i].restart = 1;
-}
-
 /* Stage another work item from the work returned in a longpoll */
 static void convert_to_work(json_t *val, bool rolltime)
 {
@@ -3899,9 +3894,7 @@ static void *longpoll_thread(void *userdata)
 			 * sure it's only done once per new block */
 			if (block_changed != BLOCK_DETECT) {
 				block_changed = BLOCK_LP;
-				new_blocks++;
 				applog(LOG_WARNING, "LONGPOLL detected new block on network, waiting on fresh work");
-				restart_threads();
 			} else {
 				applog(LOG_WARNING, "LONGPOLL received after new block already detected");
 				block_changed = BLOCK_NONE;
@@ -4490,6 +4483,8 @@ out:
 int main (int argc, char *argv[])
 {
 	unsigned int i, x, y, pools_active = 0;
+	struct block *block, *tmpblock;
+	struct work *work, *tmpwork;
 	struct sigaction handler;
 	struct thr_info *thr;
 	char name[256];
@@ -4528,11 +4523,13 @@ int main (int argc, char *argv[])
 			skip_to_bench = 1;
 	#endif // defined(WIN32)
 
-	for (i = 0; i < 36; i++)
-		strcat(current_block, "0");
-	current_hash = calloc(sizeof(current_hash), 1);
-	if (unlikely(!current_hash))
+	block = calloc(sizeof(struct block), 1);
+	if (unlikely(!block))
 		quit (1, "main OOM");
+	for (i = 0; i < 36; i++)
+		strcat(block->hash, "0");
+	HASH_ADD_STR(blocks, hash, block);
+	strcpy(current_block, block->hash);
 
 	// Reckon number of cores in the box
 	#if defined(WIN32)
@@ -4940,6 +4937,15 @@ int main (int argc, char *argv[])
 	if (opt_n_threads)
 		free(cpus);
 
+	HASH_ITER(hh, staged_work, work, tmpwork) {
+		HASH_DEL(staged_work, work);
+		free_work(work);
+	}
+	HASH_ITER(hh, blocks, block, tmpblock) {
+		HASH_DEL(blocks, block);
+		free(block);
+	}
+
 	curl_global_cleanup();
 
 	return 0;