Implement a scaleable networking framework designed to cope with any sized network requirements, yet minimise the number of connections being reoped. Do this by create a ring buffer linked list of curl handles to be used by getwork, recruiting extra handles when none is immediately available.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216
diff --git a/cgminer.c b/cgminer.c
index ad8ad96..ef0187c 100644
--- a/cgminer.c
+++ b/cgminer.c
@@ -379,7 +379,6 @@ static void sharelog(const char*disposition, const struct work*work)
}
static void *submit_work_thread(void *userdata);
-static void *get_work_thread(void *userdata);
static void add_pool(void)
{
@@ -392,13 +391,13 @@ static void add_pool(void)
pools[total_pools++] = pool;
if (unlikely(pthread_mutex_init(&pool->pool_lock, NULL)))
quit(1, "Failed to pthread_mutex_init in add_pool");
+ INIT_LIST_HEAD(&pool->curlring);
+
/* Make sure the pool doesn't think we've been idle since time 0 */
pool->tv_idle.tv_sec = ~0UL;
if (unlikely(pthread_create(&pool->submit_thread, NULL, submit_work_thread, (void *)pool)))
quit(1, "Failed to create pool submit thread");
- if (unlikely(pthread_create(&pool->getwork_thread, NULL, get_work_thread, (void *)pool)))
- quit(1, "Failed to create pool getwork thread");
}
/* Pool variant of test and set */
@@ -1957,77 +1956,46 @@ static void sighandler(int __maybe_unused sig)
kill_work();
}
-/* One get work thread is created per pool, so as to use one curl handle for
- * all getwork reqeusts from the same pool, minimising connections opened, but
- * separate from the submit work curl handle to not delay share submissions due
- * to getwork traffic */
-static void *get_work_thread(void *userdata)
+/* Called with pool_lock held */
+static void recruit_curl(struct pool *pool)
{
- struct pool *pool = (struct pool *)userdata;
- struct workio_cmd *wc;
- CURL *curl;
-
- pthread_detach(pthread_self());
-
- /* getwork_q memory never freed */
- pool->getwork_q = tq_new();
- if (!pool->getwork_q)
- quit(1, "Failed to tq_new in get_work_thread");
-
- curl = curl_easy_init();
- if (unlikely(!curl))
- quit(1, "Failed to initialise pool getwork CURL");
+ struct curl_ent *ce = calloc(sizeof(struct curl_ent), 1);
- while ((wc = tq_pop(pool->getwork_q, NULL)) != NULL) {
- struct work *ret_work;
- int failures = 0;
+ ce->curl = curl_easy_init();
+ if (unlikely(!ce->curl || !ce))
+ quit(1, "Failed to init in recruit_curl");
- ret_work = make_work();
-
- if (wc->thr)
- ret_work->thr = wc->thr;
- else
- ret_work->thr = NULL;
-
- ret_work->pool = pool;
-
- /* obtain new work from bitcoin via JSON-RPC */
- while (!get_upstream_work(ret_work, curl)) {
- if (unlikely((opt_retries >= 0) && (++failures > opt_retries))) {
- applog(LOG_ERR, "json_rpc_call failed, terminating workio thread");
- free_work(ret_work);
- kill_work();
- break;
- }
+ list_add(&ce->node, &pool->curlring);
+ applog(LOG_DEBUG, "Recruited new curl for pool %d", pool->pool_no);
+}
- /* pause, then restart work-request loop */
- applog(LOG_DEBUG, "json_rpc_call failed on get work, retry after %d seconds",
- fail_pause);
- sleep(fail_pause);
- fail_pause += opt_fail_pause;
- }
- fail_pause = opt_fail_pause;
+static struct curl_ent *pop_curl_entry(struct pool *pool)
+{
+ struct curl_ent *ce;
- applog(LOG_DEBUG, "Pushing work to requesting thread");
+ mutex_lock(&pool->pool_lock);
+ if (list_empty(&pool->curlring))
+ recruit_curl(pool);
+ ce = list_entry(pool->curlring.next, struct curl_ent, node);
+ list_del(&ce->node);
+ mutex_unlock(&pool->pool_lock);;
- /* send work to requesting thread */
- if (unlikely(!tq_push(thr_info[stage_thr_id].q, ret_work))) {
- applog(LOG_ERR, "Failed to tq_push work in workio_get_work");
- kill_work();
- free_work(ret_work);
- }
- workio_cmd_free(wc);
- }
+ return ce;
+}
- curl_easy_cleanup(curl);
- return NULL;
+static void push_curl_entry(struct curl_ent *ce, struct pool *pool)
+{
+ mutex_lock(&pool->pool_lock);
+ list_add(&ce->node, &pool->curlring);
+ mutex_unlock(&pool->pool_lock);
}
-static void *get_extra_work(void *userdata)
+static void *get_work_thread(void *userdata)
{
struct workio_cmd *wc = (struct workio_cmd *)userdata;
- struct work *ret_work = make_work();;
- CURL *curl = curl_easy_init();
+ struct work *ret_work = make_work();
+ struct curl_ent *ce;
+ struct pool *pool;
int failures = 0;
pthread_detach(pthread_self());
@@ -2039,10 +2007,11 @@ static void *get_extra_work(void *userdata)
else
ret_work->thr = NULL;
- ret_work->pool = select_pool(wc->lagging);
+ pool = ret_work->pool = select_pool(wc->lagging);
+ ce = pop_curl_entry(pool);
/* obtain new work from bitcoin via JSON-RPC */
- while (!get_upstream_work(ret_work, curl)) {
+ while (!get_upstream_work(ret_work, ce->curl)) {
if (unlikely((opt_retries >= 0) && (++failures > opt_retries))) {
applog(LOG_ERR, "json_rpc_call failed, terminating workio thread");
free_work(ret_work);
@@ -2069,7 +2038,7 @@ static void *get_extra_work(void *userdata)
out:
workio_cmd_free(wc);
- curl_easy_cleanup(curl);
+ push_curl_entry(ce, pool);
return NULL;
}
@@ -2078,13 +2047,9 @@ out:
* requests */
static bool workio_get_work(struct workio_cmd *wc)
{
- struct pool *pool = select_pool(wc->lagging);
pthread_t get_thread;
- if (list_empty(&pool->getwork_q->q) || pool->submit_fail)
- return tq_push(pool->getwork_q, wc);
-
- if (unlikely(pthread_create(&get_thread, NULL, get_extra_work, (void *)wc))) {
+ if (unlikely(pthread_create(&get_thread, NULL, get_work_thread, (void *)wc))) {
applog(LOG_ERR, "Failed to create get_work_thread");
return false;
}
@@ -4450,6 +4415,8 @@ int add_pool_details(bool live, char *url, char *user, char *pass)
pool->prio = total_pools;
if (unlikely(pthread_mutex_init(&pool->pool_lock, NULL)))
quit (1, "Failed to pthread_mutex_init in input_pool");
+ INIT_LIST_HEAD(&pool->curlring);
+
pool->rpc_url = url;
pool->rpc_user = user;
pool->rpc_pass = pass;
@@ -4462,8 +4429,6 @@ int add_pool_details(bool live, char *url, char *user, char *pass)
if (unlikely(pthread_create(&pool->submit_thread, NULL, submit_work_thread, (void *)pool)))
quit(1, "Failed to create pool submit thread");
- if (unlikely(pthread_create(&pool->getwork_thread, NULL, get_work_thread, (void *)pool)))
- quit(1, "Failed to create pool getwork thread");
/* Test the pool is not idle if we're live running, otherwise
* it will be tested separately */
diff --git a/miner.h b/miner.h
index 6c750be..3f6f9ba 100644
--- a/miner.h
+++ b/miner.h
@@ -598,6 +598,11 @@ typedef struct {
} dev_blk_ctx;
#endif
+struct curl_ent {
+ CURL *curl;
+ struct list_head node;
+};
+
struct pool {
int pool_no;
int prio;
@@ -634,6 +639,8 @@ struct pool {
pthread_t longpoll_thread;
pthread_t submit_thread;
pthread_t getwork_thread;
+
+ struct list_head curlring;
};
struct work {