Faster hardware can easily outstrip the speed we can get work and submit shares when using only one connection per pool. Test the queued list to see if any get/submits are already queued and if they are, start recruiting extra connections by generating new threads. This allows us to reuse network connections at low loads but recuit new open connections as they're needed, so that cgminer can scale to hardware of any size.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202
diff --git a/cgminer.c b/cgminer.c
index ef13b0d..af53532 100644
--- a/cgminer.c
+++ b/cgminer.c
@@ -1585,7 +1585,7 @@ bool regeneratehash(const struct work *work)
return false;
}
-static bool submit_upstream_work(const struct work *work)
+static bool submit_upstream_work(const struct work *work, CURL *curl)
{
char *hexstr = NULL;
json_t *val, *res;
@@ -1594,7 +1594,6 @@ static bool submit_upstream_work(const struct work *work)
int thr_id = work->thr_id;
struct cgpu_info *cgpu = thr_info[thr_id].cgpu;
struct pool *pool = work->pool;
- CURL *curl = pool->submit_curl;
bool rolltime;
uint32_t *hash32;
char hashshow[64+1] = "";
@@ -1770,10 +1769,9 @@ static void get_benchmark_work(struct work *work)
memcpy(work, &bench_block, min_size);
}
-static bool get_upstream_work(struct work *work)
+static bool get_upstream_work(struct work *work, CURL *curl)
{
struct pool *pool = work->pool;
- CURL *curl = pool->getwork_curl;
json_t *val = NULL;
bool rc = false;
int retries = 0;
@@ -1996,7 +1994,7 @@ static void *get_work_thread(void *userdata)
ret_work->pool = pool;
/* obtain new work from bitcoin via JSON-RPC */
- while (!get_upstream_work(ret_work)) {
+ while (!get_upstream_work(ret_work, pool->getwork_curl)) {
if (unlikely((opt_retries >= 0) && (++failures > opt_retries))) {
applog(LOG_ERR, "json_rpc_call failed, terminating workio thread");
free_work(ret_work);
@@ -2026,11 +2024,72 @@ static void *get_work_thread(void *userdata)
return NULL;
}
+static void *get_extra_work(void *userdata)
+{
+ struct workio_cmd *wc = (struct workio_cmd *)userdata;
+ struct work *ret_work = make_work();;
+ CURL *curl = curl_easy_init();
+ int failures = 0;
+
+ pthread_detach(pthread_self());
+
+ applog(LOG_DEBUG, "Creating extra get work thread");
+
+ if (wc->thr)
+ ret_work->thr = wc->thr;
+ else
+ ret_work->thr = NULL;
+
+ ret_work->pool = select_pool(wc->lagging);
+
+ /* obtain new work from bitcoin via JSON-RPC */
+ while (!get_upstream_work(ret_work, curl)) {
+ if (unlikely((opt_retries >= 0) && (++failures > opt_retries))) {
+ applog(LOG_ERR, "json_rpc_call failed, terminating workio thread");
+ free_work(ret_work);
+ kill_work();
+ goto out;
+ }
+
+ /* pause, then restart work-request loop */
+ applog(LOG_DEBUG, "json_rpc_call failed on get work, retry after %d seconds",
+ fail_pause);
+ sleep(fail_pause);
+ fail_pause += opt_fail_pause;
+ }
+ fail_pause = opt_fail_pause;
+
+ applog(LOG_DEBUG, "Pushing work to requesting thread");
+
+ /* send work to requesting thread */
+ if (unlikely(!tq_push(thr_info[stage_thr_id].q, ret_work))) {
+ applog(LOG_ERR, "Failed to tq_push work in workio_get_work");
+ kill_work();
+ free_work(ret_work);
+ }
+
+out:
+ workio_cmd_free(wc);
+ curl_easy_cleanup(curl);
+ return NULL;
+}
+
+/* As per the submit work system, we try to reuse the existing curl handles,
+ * but start recruiting extra connections if we start accumulating queued
+ * requests */
static bool workio_get_work(struct workio_cmd *wc)
{
struct pool *pool = select_pool(wc->lagging);
+ pthread_t get_thread;
- return tq_push(pool->getwork_q, wc);
+ if (list_empty(&pool->getwork_q->q))
+ return tq_push(pool->getwork_q, wc);
+
+ if (unlikely(pthread_create(&get_thread, NULL, get_extra_work, (void *)wc))) {
+ applog(LOG_ERR, "Failed to create get_work_thread");
+ return false;
+ }
+ return true;
}
static bool stale_work(struct work *work, bool share)
@@ -2100,7 +2159,7 @@ static void *submit_work_thread(void *userdata)
}
/* submit solution to bitcoin via JSON-RPC */
- while (!submit_upstream_work(work)) {
+ while (!submit_upstream_work(work, pool->submit_curl)) {
if (!opt_submit_stale && stale_work(work, true) && !pool->submit_old) {
applog(LOG_NOTICE, "Stale share detected on submit retry, discarding");
total_stale++;
@@ -2126,9 +2185,75 @@ static void *submit_work_thread(void *userdata)
return NULL;
}
+static void *submit_extra_work(void *userdata)
+{
+ struct workio_cmd *wc = (struct workio_cmd *)userdata;
+ struct work *work = wc->u.work;
+ struct pool *pool = work->pool;
+ CURL *curl = curl_easy_init();
+ int failures = 0;
+
+ pthread_detach(pthread_self());
+
+ applog(LOG_DEBUG, "Creating extra submit work thread");
+
+ if (stale_work(work, true)) {
+ if (opt_submit_stale)
+ applog(LOG_NOTICE, "Stale share detected, submitting as user requested");
+ else if (pool->submit_old)
+ applog(LOG_NOTICE, "Stale share detected, submitting as pool requested");
+ else {
+ applog(LOG_NOTICE, "Stale share detected, discarding");
+ sharelog("discard", work);
+ total_stale++;
+ pool->stale_shares++;
+ goto out;
+ }
+ }
+
+ /* submit solution to bitcoin via JSON-RPC */
+ while (!submit_upstream_work(work, curl)) {
+ if (!opt_submit_stale && stale_work(work, true)) {
+ applog(LOG_NOTICE, "Stale share detected, discarding");
+ total_stale++;
+ pool->stale_shares++;
+ break;
+ }
+ if (unlikely((opt_retries >= 0) && (++failures > opt_retries))) {
+ applog(LOG_ERR, "Failed %d retries ...terminating workio thread", opt_retries);
+ kill_work();
+ break;
+ }
+
+ /* pause, then restart work-request loop */
+ applog(LOG_INFO, "json_rpc_call failed on submit_work, retry after %d seconds",
+ fail_pause);
+ sleep(fail_pause);
+ fail_pause += opt_fail_pause;
+ }
+ fail_pause = opt_fail_pause;
+out:
+ workio_cmd_free(wc);
+ curl_easy_cleanup(curl);
+ return NULL;
+}
+
+/* We try to reuse curl handles as much as possible, but if there is already
+ * work queued to be submitted, we start generating extra handles to submit
+ * the shares to avoid ever increasing backlogs. This allows us to scale to
+ * any size hardware */
static bool workio_submit_work(struct workio_cmd *wc)
{
- return tq_push(wc->u.work->pool->submit_q, wc);
+ pthread_t submit_thread;
+
+ if (list_empty(&wc->u.work->pool->submit_q->q))
+ return tq_push(wc->u.work->pool->submit_q, wc);
+
+ if (unlikely(pthread_create(&submit_thread, NULL, submit_extra_work, (void *)wc))) {
+ applog(LOG_ERR, "Failed to create submit_work_thread");
+ return false;
+ }
+ return true;
}
/* Find the pool that currently has the highest priority */