1 contributor
232 lines4.7 KB
Newer
Older
-
+
commited
{line.log.rev}
on
9 months ago
37
1
#include "future.h"
2
#include "coroutine.h"
3
#include "task.h"
4
#include <assert.h>
5
#include <stdio.h>
6
#include <stdlib.h>
7
#include <time.h>
8
#include <errno.h>
9
#include "cor_thread_local.h"
10
#include <math.h>
11
#include <string.h>
12
#include "timespec_utils.h"
13
14
15
Future_vfptrs_t Future_vfptrs = {
16
&Future_dtor,
17
&_Future_Await,
18
&_Future_SetResult
19
};
20
21
typedef struct Future_WatcherSpec {
22
Future_Watcher watcher;
23
void *me;
24
} Future_WatcherSpec;
25
26
void Future_ctor(Future *fut){
27
fut->vfptrs = &Future_vfptrs;
28
int r = pthread_mutex_init(&fut->mutex, NULL);
29
assert(r == 0);
30
fut->state = Future_State_Waiting;
31
fut->value = NULL;
32
fut->canceled = false;
33
fut->watchers = NULL;
34
fut->nwatchers = 0;
35
fut->maxwatchers = 0;
36
}
37
38
39
Future *Future_New(){
40
Future *fut = malloc(sizeof(Future));
41
Future_ctor(fut);
42
return fut;
43
}
44
45
46
void Future_dtor(
47
Future *fut
48
){
49
int r;
50
51
r = pthread_mutex_lock(&fut->mutex);
52
assert(r == 0);
53
assert(fut->nwatchers == 0);
54
free(fut->watchers);
55
r = pthread_mutex_unlock(&fut->mutex);
56
assert(r == 0);
57
58
r = pthread_mutex_destroy(&fut->mutex);
59
assert(r == 0);
60
}
61
62
63
void Future_Delete(
64
Future *fut
65
){
66
fut->vfptrs->dtor(fut);
67
free(fut);
68
}
69
70
71
void Future_AddWatcher(
72
Future *fut,
73
Future_Watcher watcher,
74
void *watcher_me
75
){
76
int r = pthread_mutex_lock(&fut->mutex);
77
assert(r == 0);
78
if (fut->state == Future_State_Done) {
79
r = pthread_mutex_unlock(&fut->mutex);
80
assert(r == 0);
81
watcher(watcher_me, fut);
82
} else {
83
if (fut->nwatchers >= fut->maxwatchers) {
84
fut->maxwatchers = (fut->maxwatchers == 0) ? 4 : fut->maxwatchers * 2;
85
fut->watchers = realloc(fut->watchers, fut->maxwatchers * sizeof(Future_WatcherSpec));
86
assert(fut->watchers);
87
}
88
fut->watchers[fut->nwatchers].watcher = watcher;
89
fut->watchers[fut->nwatchers].me = watcher_me;
90
fut->nwatchers++;
91
r = pthread_mutex_unlock(&fut->mutex);
92
assert(r == 0);
93
}
94
}
95
96
97
void Future_RemoveWatcher(
98
Future *fut,
99
Future_Watcher watcher,
100
void *watcher_me
101
){
102
int r = pthread_mutex_lock(&fut->mutex);
103
assert(r == 0);
104
for (int i = 0; i < fut->nwatchers; i++) {
105
if (fut->watchers[i].watcher == watcher && fut->watchers[i].me == watcher_me) {
106
fut->nwatchers--;
107
if (i < fut->nwatchers) {
108
fut->watchers[i] = fut->watchers[fut->nwatchers];
109
}
110
break;
111
}
112
}
113
r = pthread_mutex_unlock(&fut->mutex);
114
assert(r == 0);
115
}
116
117
118
static void _Future_Ready(
119
Future *fut
120
){
121
int r;
122
fut->state = Future_State_Done;
123
124
// Take note of watchers list, and reset it
125
Future_WatcherSpec *watchers = fut->watchers;
126
int nwatchers = fut->nwatchers;
127
fut->watchers = NULL;
128
fut->nwatchers = 0;
129
r = pthread_mutex_unlock(&fut->mutex);
130
assert(r == 0);
131
132
// notify those watchers
133
for (int i = 0; i < nwatchers; i++) {
134
watchers[i].watcher(watchers[i].me, fut);
135
}
136
free(watchers);
137
}
138
139
140
void Future_SetResult(
141
Future *fut,
142
bool canceled,
143
void *value
144
){
145
fut->vfptrs->set_result(fut, canceled, value);
146
}
147
148
149
bool Future_GetResult(
150
Future *fut,
151
void **res
152
){
153
assert(fut->state == Future_State_Done);
154
if (res){
155
*res = fut->value;
156
}
157
return fut->canceled;
158
}
159
160
161
typedef struct future_bits {
162
Future *fut;
163
Coroutine *cor;
164
} future_bits;
165
166
167
static void future_complete(
168
void *me,
169
Future *fut
170
){
171
(void)fut;
172
Coroutine *cor = (Coroutine *)me;
173
Coroutine_Continue(cor, NULL, false);
174
}
175
176
177
static void on_yield_for_future(
178
void *me
179
){
180
future_bits *bits = (future_bits *)me;
181
Future_AddWatcher(bits->fut, future_complete, bits->cor);
182
}
183
184
185
void _Future_Await(
186
Future *fut
187
){
188
future_bits bits;
189
bits.fut = fut;
190
bits.cor = Coroutine_GetActive();
191
Task *my_task = current_task;
192
current_task = NULL;
193
my_task->awaiting_future = fut;
194
if (my_task->canceled){
195
Future_SetResult(fut, true, my_task->cancel_value);
196
}
197
Coroutine_Yield(NULL, on_yield_for_future, &bits);
198
my_task->awaiting_future = NULL;
199
current_task = my_task;
200
}
201
202
203
void _Future_SetResult(
204
Future *fut,
205
bool canceled,
206
void *res
207
){
208
int r;
209
r = pthread_mutex_lock(&fut->mutex);
210
assert(r == 0);
211
if(fut->state == Future_State_Waiting){
212
fut->canceled = canceled;
213
fut->value = res;
214
_Future_Ready(fut);
215
} else {
216
r = pthread_mutex_unlock(&fut->mutex);
217
assert(r == 0);
218
}
219
}
220
221
222
bool Future_Await(
223
Future *fut,
224
void **res
225
){
226
fut->vfptrs->await(fut);
227
if (res){
228
*res = fut->value;
229
}
230
return fut->canceled;
231
}
232