Commit 88d9d631e30570718f1c14db49cb22735e42fc76

Con Kolivas 2011-06-30T23:36:57

Use two separate curl instances for submit and get and use separate threads for each to prevent one blocking the other.

diff --git a/cpu-miner.c b/cpu-miner.c
index 0428610..b3d2559 100644
--- a/cpu-miner.c
+++ b/cpu-miner.c
@@ -74,6 +74,7 @@ static inline void affine_to_cpu(int id, int cpu)
 enum workio_commands {
 	WC_GET_WORK,
 	WC_SUBMIT_WORK,
+	WC_DIE,
 };
 
 struct workio_cmd {
@@ -426,15 +427,48 @@ static void workio_cmd_free(struct workio_cmd *wc)
 	free(wc);
 }
 
-static bool workio_get_work(struct workio_cmd *wc, CURL *curl)
+static void kill_work(void)
+{
+	struct workio_cmd *wc;
+
+	applog(LOG_INFO, "Received kill message");
+	wc = calloc(1, sizeof(*wc));
+	if (unlikely(!wc)) {
+		applog(LOG_ERR, "Failed to calloc wc in kill_work");
+		/* We're just trying to die anyway, so forget graceful */
+		exit (1);
+	}
+
+	wc->cmd = WC_DIE;
+	wc->thr = 0;
+
+	if (unlikely(!tq_push(thr_info[work_thr_id].q, wc))) {
+		applog(LOG_ERR, "Failed to tq_push work in kill_work");
+		exit (1);
+	}
+}
+
+struct io_data{
+	struct workio_cmd *wc;
+	CURL *curl;
+};
+
+static pthread_t *get_thread = NULL;
+static pthread_t *submit_thread = NULL;
+
+static void *get_work_thread(void *userdata)
 {
+	struct io_data *io_data = (struct io_data *)userdata;
+	struct workio_cmd *wc = io_data->wc;
+	CURL *curl = io_data->curl;
 	struct work *ret_work;
 	int failures = 0;
 
 	ret_work = calloc(1, sizeof(*ret_work));
 	if (!ret_work) {
 		applog(LOG_ERR, "Failed to calloc ret_work in workio_get_work");
-		return false;
+		kill_work();
+		goto out;
 	}
 
 	/* obtain new work from bitcoin via JSON-RPC */
@@ -442,11 +476,12 @@ static bool workio_get_work(struct workio_cmd *wc, CURL *curl)
 		if (unlikely((opt_retries >= 0) && (++failures > opt_retries))) {
 			applog(LOG_ERR, "json_rpc_call failed, terminating workio thread");
 			free(ret_work);
-			return false;
+			kill_work();
+			goto out;
 		}
 
 		/* pause, then restart work-request loop */
-		applog(LOG_ERR, "json_rpc_call failed, retry after %d seconds",
+		applog(LOG_ERR, "json_rpc_call failed on get work, retry after %d seconds",
 			opt_fail_pause);
 		sleep(opt_fail_pause);
 	}
@@ -454,29 +489,96 @@ static bool workio_get_work(struct workio_cmd *wc, CURL *curl)
 	/* send work to requesting thread */
 	if (unlikely(!tq_push(wc->thr->q, ret_work))) {
 		applog(LOG_ERR, "Failed to tq_push work in workio_get_work");
+		kill_work();
 		free(ret_work);
 	}
 
+out:
+	free(io_data);
+	workio_cmd_free(wc);
+	return NULL;
+}
+
+static bool workio_get_work(struct workio_cmd *wc, CURL *curl)
+{
+	struct io_data *id = malloc(sizeof(struct io_data));
+
+	if (unlikely(!id)) {
+		applog(LOG_ERR, "Failed to malloc id in workio_get_work");
+		return false;
+	}
+	id->wc = wc;
+	id->curl = curl;
+
+	if (!get_thread) {
+		get_thread = malloc(sizeof(get_thread));
+		if (unlikely(!get_thread)) {
+			applog(LOG_ERR, "Failed to malloc get_thread in workio_get_work");
+			return false;
+		}
+	} else
+		pthread_join(*get_thread, NULL);
+
+	if (pthread_create(get_thread, NULL, get_work_thread, (void *)id)) {
+		applog(LOG_ERR, "Failed to create get_work_thread");
+		free(id);
+		return false;
+	}
 	return true;
 }
 
-static bool workio_submit_work(struct workio_cmd *wc, CURL *curl)
+static void *submit_work_thread(void *userdata)
 {
+	struct io_data *io_data = (struct io_data *)userdata;
+	struct workio_cmd *wc = io_data->wc;
+	CURL *curl = io_data->curl;
 	int failures = 0;
 
 	/* submit solution to bitcoin via JSON-RPC */
 	while (!submit_upstream_work(curl, wc->u.work)) {
 		if (unlikely((opt_retries >= 0) && (++failures > opt_retries))) {
 			applog(LOG_ERR, "Failed %d retries ...terminating workio thread", opt_retries);
-			return false;
+			kill_work();
+			goto out;
 		}
 
 		/* pause, then restart work-request loop */
-		applog(LOG_ERR, "...retry after %d seconds",
+		applog(LOG_ERR, "json_rpc_call failed on submit_work, retry after %d seconds",
 			opt_fail_pause);
 		sleep(opt_fail_pause);
 	}
 
+out:
+	workio_cmd_free(wc);
+	free(io_data);
+	return NULL;
+}
+
+static bool workio_submit_work(struct workio_cmd *wc, CURL *curl)
+{
+	struct io_data *id = malloc(sizeof(struct io_data));
+
+	if (unlikely(!id)) {
+		applog(LOG_ERR, "Failed to malloc id in workio_submit_work");
+		return false;
+	}
+	id->wc = wc;
+	id->curl = curl;
+
+	if (!submit_thread) {
+		submit_thread = malloc(sizeof(submit_thread));
+		if (unlikely(!submit_thread)) {
+			applog(LOG_ERR, "Failed to malloc submit_thread in workio_submit_work");
+			return false;
+		}
+	} else
+		pthread_join(*submit_thread, NULL);
+
+	if (pthread_create(submit_thread, NULL, submit_work_thread, (void *)id)) {
+		applog(LOG_ERR, "Failed to create submit_work_thread");
+		free(id);
+		return false;
+	}
 	return true;
 }
 
@@ -484,10 +586,11 @@ static void *workio_thread(void *userdata)
 {
 	struct thr_info *mythr = userdata;
 	bool ok = true;
-	CURL *curl;
+	CURL *get_curl, *submit_curl;
 
-	curl = curl_easy_init();
-	if (unlikely(!curl)) {
+	get_curl = curl_easy_init();
+	submit_curl = curl_easy_init();
+	if (unlikely(!get_curl || !submit_curl)) {
 		applog(LOG_ERR, "CURL initialization failed");
 		return NULL;
 	}
@@ -497,7 +600,7 @@ static void *workio_thread(void *userdata)
 
 		/* wait for workio_cmd sent to us, on our queue */
 		wc = tq_pop(mythr->q, NULL);
-		if (!wc) {
+		if (unlikely(!wc)) {
 			ok = false;
 			break;
 		}
@@ -505,22 +608,21 @@ static void *workio_thread(void *userdata)
 		/* process workio_cmd */
 		switch (wc->cmd) {
 		case WC_GET_WORK:
-			ok = workio_get_work(wc, curl);
+			ok = workio_get_work(wc, get_curl);
 			break;
 		case WC_SUBMIT_WORK:
-			ok = workio_submit_work(wc, curl);
+			ok = workio_submit_work(wc, submit_curl);
 			break;
-
-		default:		/* should never happen */
+		case WC_DIE:
+		default:
 			ok = false;
 			break;
 		}
-
-		workio_cmd_free(wc);
 	}
 
 	tq_freeze(mythr->q);
-	curl_easy_cleanup(curl);
+	curl_easy_cleanup(submit_curl);
+	curl_easy_cleanup(get_curl);
 
 	return NULL;
 }
@@ -1446,8 +1548,6 @@ int main (int argc, char *argv[])
 			return 1;
 		}
 		pthread_detach(thr->pth);
-
-		sleep(1);	/* don't pound RPC server all at once */
 	}
 
 	applog(LOG_INFO, "%d cpu miner threads started, "