Commit eda382990a43d9e4e9af3d87ec3657a98a956a18

Con Kolivas 2012-04-27T11:58:47

Create one longpoll thread per pool, using backup pools for those pools that don't have longpoll. Use the work created from the longpoll return only if we don't have failover-enabled, and only flag the work as a longpoll if it is the current pool. This will work around the problem of trying to restart the single longpoll thread on pool changes that was leading to race conditions. It will also have less work restarts from the multiple longpolls received from different pools.

diff --git a/cgminer.c b/cgminer.c
index 47a43be..fdd9f5a 100644
--- a/cgminer.c
+++ b/cgminer.c
@@ -143,7 +143,6 @@ char *cgminer_path;
 
 struct thr_info *thr_info;
 static int work_thr_id;
-int longpoll_thr_id;
 static int stage_thr_id;
 static int watchpool_thr_id;
 static int watchdog_thr_id;
@@ -379,6 +378,7 @@ static void sharelog(const char*disposition, const struct work*work)
 
 static void *submit_work_thread(void *userdata);
 static void *get_work_thread(void *userdata);
+static void *longpoll_thread(void *userdata);
 
 static void add_pool(void)
 {
@@ -402,6 +402,8 @@ static void add_pool(void)
 		quit(1, "Failed to create pool submit thread");
 	if (unlikely(pthread_create(&pool->getwork_thread, NULL, get_work_thread, (void *)pool)))
 		quit(1, "Failed to create pool getwork thread");
+	if (unlikely(pthread_create(&pool->longpoll_thread, NULL, longpoll_thread, (void *)pool)))
+		quit(1, "Failed to create pool longpoll thread");
 }
 
 /* Pool variant of test and set */
@@ -1935,11 +1937,6 @@ static void __kill_work(void)
 	thr = &thr_info[stage_thr_id];
 	thr_info_cancel(thr);
 
-	applog(LOG_DEBUG, "Killing off longpoll thread");
-	thr = &thr_info[longpoll_thr_id];
-	if (have_longpoll)
-		thr_info_cancel(thr);
-
 	applog(LOG_DEBUG, "Killing off API thread");
 	thr = &thr_info[api_thr_id];
 	thr_info_cancel(thr);
@@ -1987,9 +1984,6 @@ static void sighandler(int __maybe_unused sig)
 	kill_work();
 }
 
-static void start_longpoll(void);
-static void stop_longpoll(void);
-
 /* One get work thread is created per pool, so as to use one curl handle for
  * all getwork reqeusts from the same pool, minimising connections opened, but
  * separate from the submit work curl handle to not delay share submissions due
@@ -2015,12 +2009,6 @@ static void *get_work_thread(void *userdata)
 		struct work *ret_work;
 		int failures = 0;
 
-		if (unlikely(!pool->is_lp && pool == current_pool() &&
-			pool->hdr_path && !pool_tset(pool, &pool->lp_sent))) {
-				stop_longpoll();
-				start_longpoll();
-		}
-
 		ret_work = make_work();
 
 		if (wc->thr)
@@ -2568,6 +2556,7 @@ void remove_pool(struct pool *pool)
 	}
 	/* Give it an invalid number */
 	pool->pool_no = total_pools;
+	pool->removed = true;
 	total_pools--;
 }
 
@@ -3820,19 +3809,28 @@ enum {
 /* Stage another work item from the work returned in a longpoll */
 static void convert_to_work(json_t *val, bool rolltime, struct pool *pool)
 {
+	struct pool *cp = current_pool();
 	struct work *work, *work_clone;
 	bool rc;
 
+	/* Don't use as work if we have failover-only enabled */
+	if (pool != cp && opt_fail_only)
+		return;
+
 	work = make_work();
 
-	rc= work_decode(json_object_get(val, "result"), work);
+	rc = work_decode(json_object_get(val, "result"), work);
 	if (unlikely(!rc)) {
 		applog(LOG_ERR, "Could not convert longpoll data to work");
+		free_work(work);
 		return;
 	}
 	work->pool = pool;
 	work->rolltime = rolltime;
-	work->longpoll = true;
+
+	/* Only flag this as longpoll work if the pool is the current pool */
+	if (pool == cp)
+		work->longpoll = true;
 
 	/* We'll be checking this work item twice, but we already know it's
 	 * from a new block so explicitly force the new block detection now
@@ -3864,9 +3862,8 @@ static void convert_to_work(json_t *val, bool rolltime, struct pool *pool)
 /* If we want longpoll, enable it for the chosen default pool, or, if
  * the pool does not support longpoll, find the first one that does
  * and use its longpoll support */
-static struct pool *select_longpoll_pool(void)
+static struct pool *select_longpoll_pool(struct pool *cp)
 {
-	struct pool *cp = current_pool();
 	int i;
 
 	if (cp->hdr_path)
@@ -3882,37 +3879,37 @@ static struct pool *select_longpoll_pool(void)
 
 static void *longpoll_thread(void *userdata)
 {
-	struct thr_info *mythr = userdata;
+	struct pool *cp = (struct pool *)userdata;
+	/* This *pool is the source of the actual longpoll, not the pool we've
+	 * tied it to */
+	struct pool *pool = NULL;
 	struct timeval start, end;
-	struct pool *sp, *pool;
 	CURL *curl = NULL;
 	int failures = 0;
 	bool rolltime;
 
-	pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
-
 	curl = curl_easy_init();
 	if (unlikely(!curl)) {
 		applog(LOG_ERR, "CURL initialisation failed");
 		goto out;
 	}
 
-	tq_pop(mythr->q, NULL);
-
-	pool = select_longpoll_pool();
+retry_pool:
+	pool = select_longpoll_pool(cp);
 	if (!pool) {
-		applog(LOG_WARNING, "No long-poll found on any pool server");
+		applog(LOG_WARNING, "No suitable long-poll found for pool %s", cp->rpc_url);
 		while (!pool) {
-			sleep(30);
-			pool = select_longpoll_pool();
+			sleep(60);
+			pool = select_longpoll_pool(cp);
 		}
 	}
 
-	pool->is_lp = true;
-	have_longpoll = true;
-	applog(LOG_WARNING, "Long-polling activated for %s", pool->lp_url);
+	if (cp == pool)
+		applog(LOG_WARNING, "Long-polling activated for %s", pool->lp_url);
+	else
+		applog(LOG_WARNING, "Long-polling activated for pool %s via %s", cp->rpc_url, pool->lp_url);
 
-	while (1) {
+	while (42) {
 		json_t *val, *soval;
 
 		gettimeofday(&start, NULL);
@@ -3951,45 +3948,23 @@ static void *longpoll_thread(void *userdata)
 				goto out;
 			}
 		}
-		sp = select_longpoll_pool();
-		if (sp != pool) {
-			pool->is_lp = false;
-			pool = sp;
-			pool->is_lp = true;
-			applog(LOG_WARNING, "Long-polling changed to %s", pool->lp_url);
+		if (pool != cp) {
+			pool = select_longpoll_pool(cp);
+			if (unlikely(!pool))
+				goto retry_pool;
 		}
+
+		if (unlikely(pool->removed))
+			break;
 	}
 
 out:
 	if (curl)
 		curl_easy_cleanup(curl);
 
-	tq_freeze(mythr->q);
 	return NULL;
 }
 
-static void stop_longpoll(void)
-{
-	struct thr_info *thr = &thr_info[longpoll_thr_id];
-
-	thr_info_cancel(thr);
-	if (have_longpoll)
-		pthread_join(thr->pth, NULL);
-	have_longpoll = false;
-	tq_freeze(thr->q);
-}
-
-static void start_longpoll(void)
-{
-	struct thr_info *thr = &thr_info[longpoll_thr_id];
-
-	tq_thaw(thr->q);
-	if (unlikely(thr_info_create(thr, NULL, longpoll_thread, thr)))
-		quit(1, "longpoll thread create failed");
-	applog(LOG_DEBUG, "Pushing ping to longpoll thread");
-	tq_push(thr_info[longpoll_thr_id].q, &ping);
-}
-
 void reinit_device(struct cgpu_info *cgpu)
 {
 	if (cgpu->api->reinit_device)
@@ -4872,7 +4847,7 @@ int main(int argc, char *argv[])
 			fork_monitor();
 	#endif // defined(unix)
 
-	total_threads = mining_threads + 8;
+	total_threads = mining_threads + 7;
 	work_restart = calloc(total_threads, sizeof(*work_restart));
 	if (!work_restart)
 		quit(1, "Failed to calloc work_restart");
@@ -4893,15 +4868,7 @@ int main(int argc, char *argv[])
 	if (thr_info_create(thr, NULL, workio_thread, thr))
 		quit(1, "workio thread create failed");
 
-	/* init longpoll thread info */
-	longpoll_thr_id = mining_threads + 1;
-	thr = &thr_info[longpoll_thr_id];
-	thr->id = longpoll_thr_id;
-	thr->q = tq_new();
-	if (!thr->q)
-		quit(1, "Failed to tq_new");
-
-	stage_thr_id = mining_threads + 2;
+	stage_thr_id = mining_threads + 1;
 	thr = &thr_info[stage_thr_id];
 	thr->q = tq_new();
 	if (!thr->q)
@@ -4970,8 +4937,6 @@ int main(int argc, char *argv[])
 		}
 	} while (!pools_active);
 
-	start_longpoll();
-
 begin_bench:
 	total_mhashes_done = 0;
 	for (i = 0; i < total_devices; i++) {
@@ -5039,14 +5004,14 @@ begin_bench:
 	gettimeofday(&total_tv_start, NULL);
 	gettimeofday(&total_tv_end, NULL);
 
-	watchpool_thr_id = mining_threads + 3;
+	watchpool_thr_id = mining_threads + 2;
 	thr = &thr_info[watchpool_thr_id];
 	/* start watchpool thread */
 	if (thr_info_create(thr, NULL, watchpool_thread, NULL))
 		quit(1, "watchpool thread create failed");
 	pthread_detach(thr->pth);
 
-	watchdog_thr_id = mining_threads + 4;
+	watchdog_thr_id = mining_threads + 3;
 	thr = &thr_info[watchdog_thr_id];
 	/* start watchdog thread */
 	if (thr_info_create(thr, NULL, watchdog_thread, NULL))
@@ -5054,7 +5019,7 @@ begin_bench:
 	pthread_detach(thr->pth);
 
 	/* Create reinit gpu thread */
-	gpur_thr_id = mining_threads + 5;
+	gpur_thr_id = mining_threads + 4;
 	thr = &thr_info[gpur_thr_id];
 	thr->q = tq_new();
 	if (!thr->q)
@@ -5063,7 +5028,7 @@ begin_bench:
 		quit(1, "reinit_gpu thread create failed");
 
 	/* Create API socket thread */
-	api_thr_id = mining_threads + 6;
+	api_thr_id = mining_threads + 5;
 	thr = &thr_info[api_thr_id];
 	if (thr_info_create(thr, NULL, api_thread, thr))
 		quit(1, "API thread create failed");
@@ -5073,7 +5038,7 @@ begin_bench:
 	/* Create curses input thread for keyboard input. Create this last so
 	 * that we know all threads are created since this can call kill_work
 	 * to try and shut down ll previous threads. */
-	input_thr_id = mining_threads + 7;
+	input_thr_id = mining_threads + 6;
 	thr = &thr_info[input_thr_id];
 	if (thr_info_create(thr, NULL, input_thread, thr))
 		quit(1, "input thread create failed");
diff --git a/miner.h b/miner.h
index a8fd407..d08fbbe 100644
--- a/miner.h
+++ b/miner.h
@@ -542,7 +542,6 @@ extern int num_processors;
 extern int hw_errors;
 extern bool use_syslog;
 extern struct thr_info *thr_info;
-extern int longpoll_thr_id;
 extern struct work_restart *work_restart;
 extern struct cgpu_info gpus[MAX_GPUDEVICES];
 extern int gpu_threads;
@@ -603,17 +602,17 @@ struct pool {
 	int pool_no;
 	int prio;
 	int accepted, rejected;
+
 	bool submit_fail;
 	bool idle;
 	bool lagging;
 	bool probed;
 	bool enabled;
 	bool submit_old;
+	bool removed;
 
 	char *hdr_path;
 	char *lp_url;
-	bool lp_sent;
-	bool is_lp;
 
 	unsigned int getwork_requested;
 	unsigned int stale_shares;
@@ -631,6 +630,7 @@ struct pool {
 	struct thread_q *submit_q;
 	struct thread_q *getwork_q;
 
+	pthread_t longpoll_thread;
 	pthread_t submit_thread;
 	pthread_t getwork_thread;
 
diff --git a/util.c b/util.c
index b65ba12..e53dd67 100644
--- a/util.c
+++ b/util.c
@@ -350,8 +350,6 @@ json_t *json_rpc_call(CURL *curl, const char *url,
 	}
 
 	rc = curl_easy_perform(curl);
-	if (longpoll)
-		pool_tclear(pool, &pool->lp_sent);
 	if (rc) {
 		applog(LOG_INFO, "HTTP request failed: %s", curl_err_str);
 		goto err_out;