9 months ago |
37 |
1 |
#include "future.h" | ||
2 |
#include "coroutine.h" | ||||
3 |
#include "task.h" | ||||
4 |
#include <assert.h> | ||||
5 |
#include <stdio.h> | ||||
6 |
#include <stdlib.h> | ||||
7 |
#include <time.h> | ||||
8 |
#include <errno.h> | ||||
9 |
#include "cor_thread_local.h" | ||||
10 |
#include <math.h> | ||||
11 |
#include <string.h> | ||||
12 |
#include "timespec_utils.h" | ||||
13 |
|||||
14 |
|||||
15 |
Future_vfptrs_t Future_vfptrs = { | ||||
16 |
&Future_dtor, | ||||
17 |
&_Future_Await, | ||||
18 |
&_Future_SetResult | ||||
19 |
}; | ||||
20 |
|||||
21 |
typedef struct Future_WatcherSpec { | ||||
22 |
Future_Watcher watcher; | ||||
23 |
void *me; | ||||
24 |
} Future_WatcherSpec; | ||||
25 |
|||||
26 |
void Future_ctor(Future *fut){ | ||||
27 |
fut->vfptrs = &Future_vfptrs; | ||||
28 |
int r = pthread_mutex_init(&fut->mutex, NULL); | ||||
29 |
assert(r == 0); | ||||
30 |
fut->state = Future_State_Waiting; | ||||
31 |
fut->value = NULL; | ||||
32 |
fut->canceled = false; | ||||
33 |
fut->watchers = NULL; | ||||
34 |
fut->nwatchers = 0; | ||||
35 |
fut->maxwatchers = 0; | ||||
36 |
} | ||||
37 |
|||||
38 |
|||||
39 |
Future *Future_New(){ | ||||
40 |
Future *fut = malloc(sizeof(Future)); | ||||
41 |
Future_ctor(fut); | ||||
42 |
return fut; | ||||
43 |
} | ||||
44 |
|||||
45 |
|||||
46 |
void Future_dtor( | ||||
47 |
Future *fut | ||||
48 |
){ | ||||
49 |
int r; | ||||
50 |
|||||
51 |
r = pthread_mutex_lock(&fut->mutex); | ||||
52 |
assert(r == 0); | ||||
53 |
assert(fut->nwatchers == 0); | ||||
54 |
free(fut->watchers); | ||||
55 |
r = pthread_mutex_unlock(&fut->mutex); | ||||
56 |
assert(r == 0); | ||||
57 |
|||||
58 |
r = pthread_mutex_destroy(&fut->mutex); | ||||
59 |
assert(r == 0); | ||||
60 |
} | ||||
61 |
|||||
62 |
|||||
63 |
void Future_Delete( | ||||
64 |
Future *fut | ||||
65 |
){ | ||||
66 |
fut->vfptrs->dtor(fut); | ||||
67 |
free(fut); | ||||
68 |
} | ||||
69 |
|||||
70 |
|||||
71 |
void Future_AddWatcher( | ||||
72 |
Future *fut, | ||||
73 |
Future_Watcher watcher, | ||||
74 |
void *watcher_me | ||||
75 |
){ | ||||
76 |
int r = pthread_mutex_lock(&fut->mutex); | ||||
77 |
assert(r == 0); | ||||
78 |
if (fut->state == Future_State_Done) { | ||||
79 |
r = pthread_mutex_unlock(&fut->mutex); | ||||
80 |
assert(r == 0); | ||||
81 |
watcher(watcher_me, fut); | ||||
82 |
} else { | ||||
83 |
if (fut->nwatchers >= fut->maxwatchers) { | ||||
84 |
fut->maxwatchers = (fut->maxwatchers == 0) ? 4 : fut->maxwatchers * 2; | ||||
85 |
fut->watchers = realloc(fut->watchers, fut->maxwatchers * sizeof(Future_WatcherSpec)); | ||||
86 |
assert(fut->watchers); | ||||
87 |
} | ||||
88 |
fut->watchers[fut->nwatchers].watcher = watcher; | ||||
89 |
fut->watchers[fut->nwatchers].me = watcher_me; | ||||
90 |
fut->nwatchers++; | ||||
91 |
r = pthread_mutex_unlock(&fut->mutex); | ||||
92 |
assert(r == 0); | ||||
93 |
} | ||||
94 |
} | ||||
95 |
|||||
96 |
|||||
97 |
void Future_RemoveWatcher( | ||||
98 |
Future *fut, | ||||
99 |
Future_Watcher watcher, | ||||
100 |
void *watcher_me | ||||
101 |
){ | ||||
102 |
int r = pthread_mutex_lock(&fut->mutex); | ||||
103 |
assert(r == 0); | ||||
104 |
for (int i = 0; i < fut->nwatchers; i++) { | ||||
105 |
if (fut->watchers[i].watcher == watcher && fut->watchers[i].me == watcher_me) { | ||||
106 |
fut->nwatchers--; | ||||
107 |
if (i < fut->nwatchers) { | ||||
108 |
fut->watchers[i] = fut->watchers[fut->nwatchers]; | ||||
109 |
} | ||||
110 |
break; | ||||
111 |
} | ||||
112 |
} | ||||
113 |
r = pthread_mutex_unlock(&fut->mutex); | ||||
114 |
assert(r == 0); | ||||
115 |
} | ||||
116 |
|||||
117 |
|||||
118 |
static void _Future_Ready( | ||||
119 |
Future *fut | ||||
120 |
){ | ||||
121 |
int r; | ||||
122 |
fut->state = Future_State_Done; | ||||
123 |
|||||
124 |
// Take note of watchers list, and reset it | ||||
125 |
Future_WatcherSpec *watchers = fut->watchers; | ||||
126 |
int nwatchers = fut->nwatchers; | ||||
127 |
fut->watchers = NULL; | ||||
128 |
fut->nwatchers = 0; | ||||
129 |
r = pthread_mutex_unlock(&fut->mutex); | ||||
130 |
assert(r == 0); | ||||
131 |
|||||
132 |
// notify those watchers | ||||
133 |
for (int i = 0; i < nwatchers; i++) { | ||||
134 |
watchers[i].watcher(watchers[i].me, fut); | ||||
135 |
} | ||||
136 |
free(watchers); | ||||
137 |
} | ||||
138 |
|||||
139 |
|||||
140 |
void Future_SetResult( | ||||
141 |
Future *fut, | ||||
142 |
bool canceled, | ||||
143 |
void *value | ||||
144 |
){ | ||||
145 |
fut->vfptrs->set_result(fut, canceled, value); | ||||
146 |
} | ||||
147 |
|||||
148 |
|||||
149 |
bool Future_GetResult( | ||||
150 |
Future *fut, | ||||
151 |
void **res | ||||
152 |
){ | ||||
153 |
assert(fut->state == Future_State_Done); | ||||
154 |
if (res){ | ||||
155 |
*res = fut->value; | ||||
156 |
} | ||||
157 |
return fut->canceled; | ||||
158 |
} | ||||
159 |
|||||
160 |
|||||
161 |
typedef struct future_bits { | ||||
162 |
Future *fut; | ||||
163 |
Coroutine *cor; | ||||
164 |
} future_bits; | ||||
165 |
|||||
166 |
|||||
167 |
static void future_complete( | ||||
168 |
void *me, | ||||
169 |
Future *fut | ||||
170 |
){ | ||||
171 |
(void)fut; | ||||
172 |
Coroutine *cor = (Coroutine *)me; | ||||
173 |
Coroutine_Continue(cor, NULL, false); | ||||
174 |
} | ||||
175 |
|||||
176 |
|||||
177 |
static void on_yield_for_future( | ||||
178 |
void *me | ||||
179 |
){ | ||||
180 |
future_bits *bits = (future_bits *)me; | ||||
181 |
Future_AddWatcher(bits->fut, future_complete, bits->cor); | ||||
182 |
} | ||||
183 |
|||||
184 |
|||||
185 |
void _Future_Await( | ||||
186 |
Future *fut | ||||
187 |
){ | ||||
188 |
future_bits bits; | ||||
189 |
bits.fut = fut; | ||||
190 |
bits.cor = Coroutine_GetActive(); | ||||
191 |
Task *my_task = current_task; | ||||
192 |
current_task = NULL; | ||||
193 |
my_task->awaiting_future = fut; | ||||
194 |
if (my_task->canceled){ | ||||
195 |
Future_SetResult(fut, true, my_task->cancel_value); | ||||
196 |
} | ||||
197 |
Coroutine_Yield(NULL, on_yield_for_future, &bits); | ||||
198 |
my_task->awaiting_future = NULL; | ||||
199 |
current_task = my_task; | ||||
200 |
} | ||||
201 |
|||||
202 |
|||||
203 |
void _Future_SetResult( | ||||
204 |
Future *fut, | ||||
205 |
bool canceled, | ||||
206 |
void *res | ||||
207 |
){ | ||||
208 |
int r; | ||||
209 |
r = pthread_mutex_lock(&fut->mutex); | ||||
210 |
assert(r == 0); | ||||
211 |
if(fut->state == Future_State_Waiting){ | ||||
212 |
fut->canceled = canceled; | ||||
213 |
fut->value = res; | ||||
214 |
_Future_Ready(fut); | ||||
215 |
} else { | ||||
216 |
r = pthread_mutex_unlock(&fut->mutex); | ||||
217 |
assert(r == 0); | ||||
218 |
} | ||||
219 |
} | ||||
220 |
|||||
221 |
|||||
222 |
bool Future_Await( | ||||
223 |
Future *fut, | ||||
224 |
void **res | ||||
225 |
){ | ||||
226 |
fut->vfptrs->await(fut); | ||||
227 |
if (res){ | ||||
228 |
*res = fut->value; | ||||
229 |
} | ||||
230 |
return fut->canceled; | ||||
231 |
} | ||||
232 |