9 months ago |
37 |
1 |
#include "future.h" | ||
2 |
#include "coroutine.h" | ||||
3 |
#include "task.h" | ||||
4 |
#include <assert.h> | ||||
5 |
#include <stdio.h> | ||||
6 |
#include <stdlib.h> | ||||
7 |
#include <time.h> | ||||
8 |
#include <errno.h> | ||||
9 |
#include <math.h> | ||||
10 |
#include <string.h> | ||||
11 |
#include "timespec_utils.h" | ||||
12 |
|||||
13 |
|||||
14 |
Future_vfptrs_t Future_vfptrs = { | ||||
15 |
&Future_dtor, | ||||
16 |
&_Future_Await, | ||||
17 |
&_Future_SetResult | ||||
18 |
}; | ||||
19 |
|||||
20 |
typedef struct Future_WatcherSpec { | ||||
21 |
Future_Watcher watcher; | ||||
22 |
void *me; | ||||
23 |
} Future_WatcherSpec; | ||||
24 |
|||||
25 |
void Future_ctor(Future *fut){ | ||||
26 |
fut->vfptrs = &Future_vfptrs; | ||||
27 |
int r = pthread_mutex_init(&fut->mutex, NULL); | ||||
28 |
assert(r == 0); | ||||
29 |
fut->state = Future_State_Waiting; | ||||
30 |
fut->value = NULL; | ||||
31 |
fut->canceled = false; | ||||
32 |
fut->watchers = NULL; | ||||
33 |
fut->nwatchers = 0; | ||||
34 |
fut->maxwatchers = 0; | ||||
35 |
} | ||||
36 |
|||||
37 |
|||||
38 |
Future *Future_New(){ | ||||
39 |
Future *fut = malloc(sizeof(Future)); | ||||
40 |
Future_ctor(fut); | ||||
41 |
return fut; | ||||
42 |
} | ||||
43 |
|||||
44 |
|||||
45 |
void Future_dtor( | ||||
46 |
Future *fut | ||||
47 |
){ | ||||
48 |
int r; | ||||
49 |
|||||
50 |
r = pthread_mutex_lock(&fut->mutex); | ||||
51 |
assert(r == 0); | ||||
52 |
assert(fut->nwatchers == 0); | ||||
53 |
free(fut->watchers); | ||||
54 |
r = pthread_mutex_unlock(&fut->mutex); | ||||
55 |
assert(r == 0); | ||||
56 |
|||||
57 |
r = pthread_mutex_destroy(&fut->mutex); | ||||
58 |
assert(r == 0); | ||||
59 |
} | ||||
60 |
|||||
61 |
|||||
62 |
void Future_Delete( | ||||
63 |
Future *fut | ||||
64 |
){ | ||||
65 |
fut->vfptrs->dtor(fut); | ||||
66 |
free(fut); | ||||
67 |
} | ||||
68 |
|||||
69 |
|||||
70 |
void Future_AddWatcher( | ||||
71 |
Future *fut, | ||||
72 |
Future_Watcher watcher, | ||||
73 |
void *watcher_me | ||||
74 |
){ | ||||
75 |
int r = pthread_mutex_lock(&fut->mutex); | ||||
76 |
assert(r == 0); | ||||
77 |
if (fut->state == Future_State_Done) { | ||||
78 |
r = pthread_mutex_unlock(&fut->mutex); | ||||
79 |
assert(r == 0); | ||||
80 |
watcher(watcher_me, fut); | ||||
81 |
} else { | ||||
82 |
if (fut->nwatchers >= fut->maxwatchers) { | ||||
83 |
fut->maxwatchers = (fut->maxwatchers == 0) ? 4 : fut->maxwatchers * 2; | ||||
84 |
fut->watchers = realloc(fut->watchers, fut->maxwatchers * sizeof(Future_WatcherSpec)); | ||||
85 |
assert(fut->watchers); | ||||
86 |
} | ||||
87 |
fut->watchers[fut->nwatchers].watcher = watcher; | ||||
88 |
fut->watchers[fut->nwatchers].me = watcher_me; | ||||
89 |
fut->nwatchers++; | ||||
90 |
r = pthread_mutex_unlock(&fut->mutex); | ||||
91 |
assert(r == 0); | ||||
92 |
} | ||||
93 |
} | ||||
94 |
|||||
95 |
|||||
96 |
void Future_RemoveWatcher( | ||||
97 |
Future *fut, | ||||
98 |
Future_Watcher watcher, | ||||
99 |
void *watcher_me | ||||
100 |
){ | ||||
101 |
int r = pthread_mutex_lock(&fut->mutex); | ||||
102 |
assert(r == 0); | ||||
103 |
for (int i = 0; i < fut->nwatchers; i++) { | ||||
104 |
if (fut->watchers[i].watcher == watcher && fut->watchers[i].me == watcher_me) { | ||||
105 |
fut->nwatchers--; | ||||
106 |
if (i < fut->nwatchers) { | ||||
107 |
fut->watchers[i] = fut->watchers[fut->nwatchers]; | ||||
108 |
} | ||||
109 |
break; | ||||
110 |
} | ||||
111 |
} | ||||
112 |
r = pthread_mutex_unlock(&fut->mutex); | ||||
113 |
assert(r == 0); | ||||
114 |
} | ||||
115 |
|||||
116 |
|||||
117 |
static void _Future_Ready( | ||||
118 |
Future *fut | ||||
119 |
){ | ||||
120 |
int r; | ||||
121 |
fut->state = Future_State_Done; | ||||
122 |
|||||
123 |
// Take note of watchers list, and reset it | ||||
124 |
Future_WatcherSpec *watchers = fut->watchers; | ||||
125 |
int nwatchers = fut->nwatchers; | ||||
126 |
fut->watchers = NULL; | ||||
127 |
fut->nwatchers = 0; | ||||
128 |
r = pthread_mutex_unlock(&fut->mutex); | ||||
129 |
assert(r == 0); | ||||
130 |
|||||
131 |
// notify those watchers | ||||
132 |
for (int i = 0; i < nwatchers; i++) { | ||||
133 |
watchers[i].watcher(watchers[i].me, fut); | ||||
134 |
} | ||||
135 |
free(watchers); | ||||
136 |
} | ||||
137 |
|||||
138 |
|||||
139 |
void Future_SetResult( | ||||
140 |
Future *fut, | ||||
141 |
bool canceled, | ||||
142 |
void *value | ||||
143 |
){ | ||||
144 |
fut->vfptrs->set_result(fut, canceled, value); | ||||
145 |
} | ||||
146 |
|||||
147 |
|||||
148 |
bool Future_GetResult( | ||||
149 |
Future *fut, | ||||
150 |
void **res | ||||
151 |
){ | ||||
152 |
assert(fut->state == Future_State_Done); | ||||
153 |
if (res){ | ||||
154 |
*res = fut->value; | ||||
155 |
} | ||||
156 |
return fut->canceled; | ||||
157 |
} | ||||
158 |
|||||
159 |
|||||
160 |
typedef struct future_bits { | ||||
161 |
Future *fut; | ||||
162 |
Coroutine *cor; | ||||
163 |
} future_bits; | ||||
164 |
|||||
165 |
|||||
166 |
static void future_complete( | ||||
167 |
void *me, | ||||
168 |
Future *fut | ||||
169 |
){ | ||||
170 |
(void)fut; | ||||
171 |
Coroutine *cor = (Coroutine *)me; | ||||
172 |
Coroutine_Continue(cor, NULL, false); | ||||
173 |
} | ||||
174 |
|||||
175 |
|||||
176 |
static void on_yield_for_future( | ||||
177 |
void *me | ||||
178 |
){ | ||||
179 |
future_bits *bits = (future_bits *)me; | ||||
180 |
Future_AddWatcher(bits->fut, future_complete, bits->cor); | ||||
181 |
} | ||||
182 |
|||||
183 |
|||||
184 |
void _Future_Await( | ||||
185 |
Future *fut | ||||
186 |
){ | ||||
187 |
future_bits bits; | ||||
188 |
bits.fut = fut; | ||||
189 |
bits.cor = Coroutine_GetActive(); | ||||
190 |
Task *my_task = current_task; | ||||
191 |
current_task = NULL; | ||||
192 |
my_task->awaiting_future = fut; | ||||
193 |
if (my_task->canceled){ | ||||
194 |
Future_SetResult(fut, true, my_task->cancel_value); | ||||
195 |
} | ||||
196 |
Coroutine_Yield(NULL, on_yield_for_future, &bits); | ||||
197 |
my_task->awaiting_future = NULL; | ||||
198 |
current_task = my_task; | ||||
199 |
} | ||||
200 |
|||||
201 |
|||||
202 |
void _Future_SetResult( | ||||
203 |
Future *fut, | ||||
204 |
bool canceled, | ||||
205 |
void *res | ||||
206 |
){ | ||||
207 |
int r; | ||||
208 |
r = pthread_mutex_lock(&fut->mutex); | ||||
209 |
assert(r == 0); | ||||
210 |
if(fut->state == Future_State_Waiting){ | ||||
211 |
fut->canceled = canceled; | ||||
212 |
fut->value = res; | ||||
213 |
_Future_Ready(fut); | ||||
214 |
} else { | ||||
215 |
r = pthread_mutex_unlock(&fut->mutex); | ||||
216 |
assert(r == 0); | ||||
217 |
} | ||||
218 |
} | ||||
219 |
|||||
220 |
|||||
221 |
bool Future_Await( | ||||
222 |
Future *fut, | ||||
223 |
void **res | ||||
224 |
){ | ||||
225 |
fut->vfptrs->await(fut); | ||||
226 |
if (res){ | ||||
227 |
*res = fut->value; | ||||
228 |
} | ||||
229 |
return fut->canceled; | ||||
230 |
} | ||||
231 |