Commit 1b2355176b35c248fa74384949498c74bf6798eb

Con Kolivas 2011-07-07T10:04:37

Merge branch 'cgminer' into mfontani Conflicts: main.c

diff --git a/main.c b/main.c
index 8f5f85b..2298e28 100644
--- a/main.c
+++ b/main.c
@@ -145,6 +145,7 @@ static char *rpc_user, *rpc_pass;
 struct thr_info *thr_info;
 static int work_thr_id;
 int longpoll_thr_id;
+static int stage_thr_id;
 struct work_restart *work_restart = NULL;
 pthread_mutex_t time_lock;
 static pthread_mutex_t hash_lock;
@@ -155,6 +156,8 @@ static int accepted, rejected;
 int hw_errors;
 static int total_queued;
 static unsigned int getwork_requested = 0;
+static char current_block[36];
+static char blank[36];
 
 static void applog_and_exit(const char *fmt, ...)
 {
@@ -624,8 +627,6 @@ static void kill_work(void)
 	}
 }
 
-static char current_block[36];
-
 static void *get_work_thread(void *userdata)
 {
 	struct workio_cmd *wc = (struct workio_cmd *)userdata;
@@ -656,7 +657,7 @@ static void *get_work_thread(void *userdata)
 	}
 
 	/* send work to requesting thread */
-	if (unlikely(!tq_push(wc->thr->q, ret_work))) {
+	if (unlikely(!tq_push(thr_info[stage_thr_id].q, ret_work))) {
 		applog(LOG_ERR, "Failed to tq_push work in workio_get_work");
 		kill_work();
 		free(ret_work);
@@ -682,23 +683,30 @@ static void *submit_work_thread(void *userdata)
 {
 	struct workio_cmd *wc = (struct workio_cmd *)userdata;
 	int failures = 0;
+	char *hexstr;
 
 	pthread_detach(pthread_self());
-	if (unlikely(strncmp((const char *)wc->u.work->data, current_block, 36))) {
-		applog(LOG_INFO, "Stale work detected, discarding");
+
+	hexstr = bin2hex(wc->u.work->data, 36);
+	if (unlikely(!hexstr)) {
+		applog(LOG_ERR, "submit_work_thread OOM");
 		goto out;
 	}
+	if (unlikely(strncmp(hexstr, current_block, 36))) {
+		applog(LOG_INFO, "Stale work detected, discarding");
+		goto out_free;
+	}
 
 	/* submit solution to bitcoin via JSON-RPC */
 	while (!submit_upstream_work(wc->u.work)) {
-		if (unlikely(strncmp((const char *)wc->u.work->data, current_block, 36))) {
+		if (unlikely(strncmp(hexstr, current_block, 36))) {
 			applog(LOG_INFO, "Stale work detected, discarding");
-			goto out;
+			goto out_free;
 		}
 		if (unlikely((opt_retries >= 0) && (++failures > opt_retries))) {
 			applog(LOG_ERR, "Failed %d retries ...terminating workio thread", opt_retries);
 			kill_work();
-			goto out;
+			goto out_free;
 		}
 
 		/* pause, then restart work-request loop */
@@ -706,7 +714,8 @@ static void *submit_work_thread(void *userdata)
 			opt_fail_pause);
 		sleep(opt_fail_pause);
 	}
-
+out_free:
+	free(hexstr);
 out:
 	workio_cmd_free(wc);
 	return NULL;
@@ -723,6 +732,61 @@ static bool workio_submit_work(struct workio_cmd *wc)
 	return true;
 }
 
+static void *stage_thread(void *userdata)
+{
+	struct thr_info *mythr = userdata;
+	bool ok = true;
+	unsigned int i;
+
+	for (i = 0; i < 36; i++) {
+		strcat(current_block, "0");
+		strcat(blank, "0");
+	}
+
+	while (ok) {
+		struct work *work = NULL;
+		char *hexstr;
+
+		work = tq_pop(mythr->q, NULL);
+		if (unlikely(!work)) {
+			applog(LOG_ERR, "Failed to tq_pop in stage_thread");
+			ok = false;
+			break;
+		}
+
+		hexstr = bin2hex(work->data, 36);
+		if (unlikely(!hexstr)) {
+			applog(LOG_ERR, "stage_thread OOM");
+			break;
+		}
+
+		/* current_block is blanked out on successful longpoll */
+		if (likely(strncmp(current_block, blank, 36))) {
+			if (unlikely(strncmp(hexstr, current_block, 36))) {
+				if (want_longpoll)
+					applog(LOG_WARNING, "New block detected, possible missed longpoll, flushing work queue      ");
+				else
+					applog(LOG_WARNING, "New block detected, flushing work queue                                ");
+				/* As we can't flush the work from here, signal
+				 * the wakeup thread to restart all the
+				 * threads */
+				work_restart[stage_thr_id].restart = 1;
+			}
+		}
+		memcpy(current_block, hexstr, 36);
+		free(hexstr);
+
+		if (unlikely(!tq_push(thr_info[0].q, work))) {
+			applog(LOG_ERR, "Failed to tq_push work in stage_thread");
+			ok = false;
+			break;
+		}
+	}
+
+	tq_freeze(mythr->q);
+	return NULL;
+}
+
 static void *workio_thread(void *userdata)
 {
 	struct thr_info *mythr = userdata;
@@ -898,11 +962,19 @@ static bool discard_request(void)
 	return true;
 }
 
-static void flush_requests(void)
+static void flush_requests(bool longpoll)
 {
 	int i, extra;
 
 	extra = requests_queued();
+	/* When flushing from longpoll, we don't know the new work yet. When
+	 * not flushing from longpoll, the first work item is valid so do not
+	 * discard it */
+	if (longpoll)
+		memcpy(current_block, blank, 36);
+	else
+		extra--;
+
 	for (i = 0; i < extra; i++) {
 		/* Queue a whole batch of new requests */
 		if (unlikely(!queue_request())) {
@@ -942,7 +1014,7 @@ retry:
 	dec_queued();
 
 	memcpy(work, work_heap, sizeof(*work));
-	memcpy(current_block, work->data, 36);
+	
 	ret = true;
 	free(work_heap);
 out:
@@ -1356,12 +1428,12 @@ out:
 }
 #endif /* HAVE_OPENCL */
 
-static void restart_threads(void)
+static void restart_threads(bool longpoll)
 {
 	int i;
 
 	/* Discard old queued requests and get new ones */
-	flush_requests();
+	flush_requests(longpoll);
 
 	for (i = 0; i < opt_n_threads + gpu_threads; i++)
 		work_restart[i].restart = 1;
@@ -1415,10 +1487,8 @@ static void *longpoll_thread(void *userdata)
 			failures = 0;
 			json_decref(val);
 
-			if (!opt_quiet)
-				printf("LONGPOLL detected new block                                                              \n");
-			applog(LOG_INFO, "LONGPOLL detected new block");
-			restart_threads();
+			applog(LOG_WARNING, "LONGPOLL detected new block                                               ");
+			restart_threads(true);
 		} else {
 			if (failures++ < 10) {
 				sleep(30);
@@ -1453,6 +1523,10 @@ static void *wakeup_thread(void *userdata)
 	while (1) {
 		sleep(interval);
 		hashmeter(-1, &zero_tv, 0);
+		if (unlikely(work_restart[stage_thr_id].restart)) {
+			restart_threads(false);
+			work_restart[stage_thr_id].restart = 0;
+		}
 	}
 
 	return NULL;
@@ -1526,11 +1600,11 @@ int main (int argc, char *argv[])
 		openlog("cpuminer", LOG_PID, LOG_USER);
 #endif
 
-	work_restart = calloc(opt_n_threads + gpu_threads, sizeof(*work_restart));
+	work_restart = calloc(opt_n_threads + 4 + gpu_threads, sizeof(*work_restart));
 	if (!work_restart)
 		return 1;
 
-	thr_info = calloc(opt_n_threads + 3 + gpu_threads, sizeof(*thr));
+	thr_info = calloc(opt_n_threads + 4 + gpu_threads, sizeof(*thr));
 	if (!thr_info)
 		return 1;
 
@@ -1584,6 +1658,17 @@ int main (int argc, char *argv[])
 		}
 	}
 
+	stage_thr_id = opt_n_threads + gpu_threads + 3;
+	thr = &thr_info[stage_thr_id];
+	thr->q = tq_new();
+	if (!thr->q)
+		return 1;
+	/* start stage thread */
+	if (pthread_create(&thr->pth, NULL, stage_thread, thr)) {
+		applog(LOG_ERR, "stage thread create failed");
+		return 1;
+	}
+
 	/* Put enough work in the queue */
 	for (i = 0; i < opt_queue + opt_n_threads + gpu_threads; i++) {
 		if (unlikely(!queue_request())) {