Merge branch 'master' into icarus-dev
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423
diff --git a/cgminer.c b/cgminer.c
index 68c98cb..439de24 100644
--- a/cgminer.c
+++ b/cgminer.c
@@ -3220,105 +3220,6 @@ static void *submit_work_thread(void *userdata)
applog(LOG_DEBUG, "Creating extra submit work thread");
- if (stale_work(work, true)) {
- if (opt_submit_stale)
- applog(LOG_NOTICE, "Pool %d stale share detected, submitting as user requested", pool->pool_no);
- else if (pool->submit_old)
- applog(LOG_NOTICE, "Pool %d stale share detected, submitting as pool requested", pool->pool_no);
- else {
- applog(LOG_NOTICE, "Pool %d stale share detected, discarding", pool->pool_no);
- sharelog("discard", work);
-
- mutex_lock(&stats_lock);
- total_stale++;
- pool->stale_shares++;
- total_diff_stale += work->work_difficulty;
- pool->diff_stale += work->work_difficulty;
- mutex_unlock(&stats_lock);
-
- goto out;
- }
- work->stale = true;
- }
-
- if (work->stratum) {
- struct stratum_share *sshare = calloc(sizeof(struct stratum_share), 1);
- uint32_t *hash32 = (uint32_t *)work->hash, nonce;
- bool submitted = false;
- char *noncehex;
- char s[1024];
-
- sshare->sshare_time = time(NULL);
- /* This work item is freed in parse_stratum_response */
- sshare->work = work;
- nonce = *((uint32_t *)(work->data + 76));
- noncehex = bin2hex((const unsigned char *)&nonce, 4);
- memset(s, 0, 1024);
-
- mutex_lock(&sshare_lock);
- /* Give the stratum share a unique id */
- sshare->id = swork_id++;
- mutex_unlock(&sshare_lock);
-
- sprintf(s, "{\"params\": [\"%s\", \"%s\", \"%s\", \"%s\", \"%s\"], \"id\": %d, \"method\": \"mining.submit\"}",
- pool->rpc_user, work->job_id, work->nonce2, work->ntime, noncehex, sshare->id);
- free(noncehex);
-
- applog(LOG_INFO, "Submitting share %08lx to pool %d",
- (long unsigned int)htole32(hash32[6]), pool->pool_no);
-
- /* Try resubmitting for up to 2 minutes if we fail to submit
- * once and the stratum pool nonce1 still matches suggesting
- * we may be able to resume. */
- while (time(NULL) < sshare->sshare_time + 120) {
- bool sessionid_match;
-
- if (likely(stratum_send(pool, s, strlen(s)))) {
- if (pool_tclear(pool, &pool->submit_fail))
- applog(LOG_WARNING, "Pool %d communication resumed, submitting work", pool->pool_no);
-
- mutex_lock(&sshare_lock);
- HASH_ADD_INT(stratum_shares, id, sshare);
- pool->sshares++;
- mutex_unlock(&sshare_lock);
-
- applog(LOG_DEBUG, "Successfully submitted, adding to stratum_shares db");
- submitted = true;
- break;
- }
- if (!pool_tset(pool, &pool->submit_fail) && cnx_needed(pool)) {
- applog(LOG_WARNING, "Pool %d stratum share submission failure", pool->pool_no);
- total_ro++;
- pool->remotefail_occasions++;
- }
-
- if (opt_lowmem) {
- applog(LOG_DEBUG, "Lowmem option prevents resubmitting stratum share");
- break;
- }
-
- cg_rlock(&pool->data_lock);
- sessionid_match = (pool->nonce1 && !strcmp(work->nonce1, pool->nonce1));
- cg_runlock(&pool->data_lock);
-
- if (!sessionid_match) {
- applog(LOG_DEBUG, "No matching session id for resubmitting stratum share");
- break;
- }
- /* Retry every 5 seconds */
- sleep(5);
- }
-
- if (unlikely(!submitted)) {
- applog(LOG_DEBUG, "Failed to submit stratum share, discarding");
- free_work(work);
- free(sshare);
- pool->stale_shares++;
- total_stale++;
- }
- goto out;
- }
-
ce = pop_curl_entry(pool);
/* submit solution to bitcoin via JSON-RPC */
while (!submit_upstream_work(work, ce->curl, resubmit)) {
@@ -3337,6 +3238,7 @@ static void *submit_work_thread(void *userdata)
pool->diff_stale += work->work_difficulty;
mutex_unlock(&stats_lock);
+ free_work(work);
break;
}
@@ -3344,7 +3246,7 @@ static void *submit_work_thread(void *userdata)
applog(LOG_INFO, "json_rpc_call failed on submit_work, retrying");
}
push_curl_entry(ce, pool);
-out:
+
return NULL;
}
@@ -4909,18 +4811,18 @@ static bool supports_resume(struct pool *pool)
return ret;
}
-/* One stratum thread per pool that has stratum waits on the socket checking
- * for new messages and for the integrity of the socket connection. We reset
- * the connection based on the integrity of the receive side only as the send
- * side will eventually expire data it fails to send. */
-static void *stratum_thread(void *userdata)
+/* One stratum receive thread per pool that has stratum waits on the socket
+ * checking for new messages and for the integrity of the socket connection. We
+ * reset the connection based on the integrity of the receive side only as the
+ * send side will eventually expire data it fails to send. */
+static void *stratum_rthread(void *userdata)
{
struct pool *pool = (struct pool *)userdata;
char threadname[16];
pthread_detach(pthread_self());
- snprintf(threadname, 16, "stratum/%d", pool->pool_no);
+ snprintf(threadname, 16, "StratumR/%d", pool->pool_no);
RenameThread(threadname);
while (42) {
@@ -5023,10 +4925,125 @@ out:
return NULL;
}
-static void init_stratum_thread(struct pool *pool)
+/* Each pool has one stratum send thread for sending shares to avoid many
+ * threads being created for submission since all sends need to be serialised
+ * anyway. */
+static void *stratum_sthread(void *userdata)
{
- if (unlikely(pthread_create(&pool->stratum_thread, NULL, stratum_thread, (void *)pool)))
- quit(1, "Failed to create stratum thread");
+ struct pool *pool = (struct pool *)userdata;
+ char threadname[16];
+
+ pthread_detach(pthread_self());
+
+ snprintf(threadname, 16, "StratumS/%d", pool->pool_no);
+ RenameThread(threadname);
+
+ pool->stratum_q = tq_new();
+ if (!pool->stratum_q)
+ quit(1, "Failed to create stratum_q in stratum_sthread");
+
+ while (42) {
+ struct stratum_share *sshare;
+ uint32_t *hash32, nonce;
+ struct work *work;
+ bool submitted;
+ char *noncehex;
+ char s[1024];
+
+ if (unlikely(pool->removed))
+ break;
+
+ work = tq_pop(pool->stratum_q, NULL);
+ if (unlikely(!work))
+ quit(1, "Stratum q returned empty work");
+
+ sshare = calloc(sizeof(struct stratum_share), 1);
+ hash32 = (uint32_t *)work->hash;
+ submitted = false;
+
+ sshare->sshare_time = time(NULL);
+ /* This work item is freed in parse_stratum_response */
+ sshare->work = work;
+ nonce = *((uint32_t *)(work->data + 76));
+ noncehex = bin2hex((const unsigned char *)&nonce, 4);
+ memset(s, 0, 1024);
+
+ mutex_lock(&sshare_lock);
+ /* Give the stratum share a unique id */
+ sshare->id = swork_id++;
+ mutex_unlock(&sshare_lock);
+
+ sprintf(s, "{\"params\": [\"%s\", \"%s\", \"%s\", \"%s\", \"%s\"], \"id\": %d, \"method\": \"mining.submit\"}",
+ pool->rpc_user, work->job_id, work->nonce2, work->ntime, noncehex, sshare->id);
+ free(noncehex);
+
+ applog(LOG_INFO, "Submitting share %08lx to pool %d",
+ (long unsigned int)htole32(hash32[6]), pool->pool_no);
+
+ /* Try resubmitting for up to 2 minutes if we fail to submit
+ * once and the stratum pool nonce1 still matches suggesting
+ * we may be able to resume. */
+ while (time(NULL) < sshare->sshare_time + 120) {
+ bool sessionid_match;
+
+ if (likely(stratum_send(pool, s, strlen(s)))) {
+ if (pool_tclear(pool, &pool->submit_fail))
+ applog(LOG_WARNING, "Pool %d communication resumed, submitting work", pool->pool_no);
+
+ mutex_lock(&sshare_lock);
+ HASH_ADD_INT(stratum_shares, id, sshare);
+ pool->sshares++;
+ mutex_unlock(&sshare_lock);
+
+ applog(LOG_DEBUG, "Successfully submitted, adding to stratum_shares db");
+ submitted = true;
+ break;
+ }
+ if (!pool_tset(pool, &pool->submit_fail) && cnx_needed(pool)) {
+ applog(LOG_WARNING, "Pool %d stratum share submission failure", pool->pool_no);
+ total_ro++;
+ pool->remotefail_occasions++;
+ }
+
+ if (opt_lowmem) {
+ applog(LOG_DEBUG, "Lowmem option prevents resubmitting stratum share");
+ break;
+ }
+
+ cg_rlock(&pool->data_lock);
+ sessionid_match = (pool->nonce1 && !strcmp(work->nonce1, pool->nonce1));
+ cg_runlock(&pool->data_lock);
+
+ if (!sessionid_match) {
+ applog(LOG_DEBUG, "No matching session id for resubmitting stratum share");
+ break;
+ }
+ /* Retry every 5 seconds */
+ sleep(5);
+ }
+
+ if (unlikely(!submitted)) {
+ applog(LOG_DEBUG, "Failed to submit stratum share, discarding");
+ free_work(work);
+ free(sshare);
+ pool->stale_shares++;
+ total_stale++;
+ }
+ }
+
+ /* Freeze the work queue but don't free up its memory in case there is
+ * work still trying to be submitted to the removed pool. */
+ tq_freeze(pool->stratum_q);
+
+ return NULL;
+}
+
+static void init_stratum_threads(struct pool *pool)
+{
+ if (unlikely(pthread_create(&pool->stratum_sthread, NULL, stratum_sthread, (void *)pool)))
+ quit(1, "Failed to create stratum sthread");
+ if (unlikely(pthread_create(&pool->stratum_rthread, NULL, stratum_rthread, (void *)pool)))
+ quit(1, "Failed to create stratum rthread");
}
static void *longpoll_thread(void *userdata);
@@ -5069,7 +5086,7 @@ retry_stratum:
bool ret = initiate_stratum(pool) && auth_stratum(pool);
if (ret)
- init_stratum_thread(pool);
+ init_stratum_threads(pool);
else
pool_tclear(pool, &pool->stratum_init);
return ret;
@@ -5476,13 +5493,45 @@ static struct work *get_work(struct thr_info *thr, const int thr_id)
void submit_work_async(struct work *work_in, struct timeval *tv_work_found)
{
struct work *work = copy_work(work_in);
+ struct pool *pool = work->pool;
pthread_t submit_thread;
if (tv_work_found)
copy_time(&work->tv_work_found, tv_work_found);
- applog(LOG_DEBUG, "Pushing submit work to work thread");
- if (unlikely(pthread_create(&submit_thread, NULL, submit_work_thread, (void *)work)))
- quit(1, "Failed to create submit_work_thread");
+
+ if (stale_work(work, true)) {
+ if (opt_submit_stale)
+ applog(LOG_NOTICE, "Pool %d stale share detected, submitting as user requested", pool->pool_no);
+ else if (pool->submit_old)
+ applog(LOG_NOTICE, "Pool %d stale share detected, submitting as pool requested", pool->pool_no);
+ else {
+ applog(LOG_NOTICE, "Pool %d stale share detected, discarding", pool->pool_no);
+ sharelog("discard", work);
+
+ mutex_lock(&stats_lock);
+ total_stale++;
+ pool->stale_shares++;
+ total_diff_stale += work->work_difficulty;
+ pool->diff_stale += work->work_difficulty;
+ mutex_unlock(&stats_lock);
+
+ free_work(work);
+ return;
+ }
+ work->stale = true;
+ }
+
+ if (work->stratum) {
+ applog(LOG_DEBUG, "Pushing pool %d work to stratum queue", pool->pool_no);
+ if (unlikely(!tq_push(pool->stratum_q, work))) {
+ applog(LOG_DEBUG, "Discarding work from removed pool");
+ free_work(work);
+ }
+ } else {
+ applog(LOG_DEBUG, "Pushing submit work to work thread");
+ if (unlikely(pthread_create(&submit_thread, NULL, submit_work_thread, (void *)work)))
+ quit(1, "Failed to create submit_work_thread");
+ }
}
void inc_hw_errors(struct thr_info *thr)
diff --git a/miner.h b/miner.h
index 471b372..2e6da64 100644
--- a/miner.h
+++ b/miner.h
@@ -1126,8 +1126,10 @@ struct pool {
bool stratum_init;
bool stratum_notify;
struct stratum_work swork;
- pthread_t stratum_thread;
+ pthread_t stratum_sthread;
+ pthread_t stratum_rthread;
pthread_mutex_t stratum_lock;
+ struct thread_q *stratum_q;
int sshares; /* stratum shares submitted waiting on response */
/* GBT variables */
diff --git a/util.c b/util.c
index 79ed856..07669e3 100644
--- a/util.c
+++ b/util.c
@@ -1084,12 +1084,6 @@ static void recalloc_sock(struct pool *pool, size_t len)
pool->sockbuf_size = new;
}
-enum recv_ret {
- RECV_OK,
- RECV_CLOSED,
- RECV_RECVFAIL
-};
-
/* Peeks at a socket to find the first end of line and then reads just that
* from the socket and returns that as a malloced char */
char *recv_line(struct pool *pool)
@@ -1098,7 +1092,6 @@ char *recv_line(struct pool *pool)
char *tok, *sret = NULL;
if (!strstr(pool->sockbuf, "\n")) {
- enum recv_ret ret = RECV_OK;
struct timeval rstart, now;
cgtime(&rstart);
@@ -1107,7 +1100,6 @@ char *recv_line(struct pool *pool)
goto out;
}
- mutex_lock(&pool->stratum_lock);
do {
char s[RBUFSIZE];
size_t slen;
@@ -1116,12 +1108,12 @@ char *recv_line(struct pool *pool)
memset(s, 0, RBUFSIZE);
n = recv(pool->sock, s, RECVSIZE, 0);
if (!n) {
- ret = RECV_CLOSED;
+ applog(LOG_DEBUG, "Socket closed waiting in recv_line");
break;
}
if (n < 0) {
if (!sock_blocks() || !socket_full(pool, false)) {
- ret = RECV_RECVFAIL;
+ applog(LOG_DEBUG, "Failed to recv sock in recv_line");
break;
}
} else {
@@ -1131,19 +1123,6 @@ char *recv_line(struct pool *pool)
}
cgtime(&now);
} while (tdiff(&now, &rstart) < 60 && !strstr(pool->sockbuf, "\n"));
- mutex_unlock(&pool->stratum_lock);
-
- switch (ret) {
- default:
- case RECV_OK:
- break;
- case RECV_CLOSED:
- applog(LOG_DEBUG, "Socket closed waiting in recv_line");
- goto out;
- case RECV_RECVFAIL:
- applog(LOG_DEBUG, "Failed to recv sock in recv_line");
- goto out;
- }
}
buflen = strlen(pool->sockbuf);