1 contributor
231 lines4.6 KB
1#include "future.h"
2#include "coroutine.h"
3#include "task.h"
4#include <assert.h>
5#include <stdio.h>
6#include <stdlib.h>
7#include <time.h>
8#include <errno.h>
9#include <math.h>
10#include <string.h>
11#include "timespec_utils.h"
12
13
14Future_vfptrs_t Future_vfptrs = {
15 &Future_dtor,
16 &_Future_Await,
17 &_Future_SetResult
18};
19
20typedef struct Future_WatcherSpec {
21 Future_Watcher watcher;
22 void *me;
23} Future_WatcherSpec;
24
25void Future_ctor(Future *fut){
26 fut->vfptrs = &Future_vfptrs;
27 int r = pthread_mutex_init(&fut->mutex, NULL);
28 assert(r == 0);
29 fut->state = Future_State_Waiting;
30 fut->value = NULL;
31 fut->canceled = false;
32 fut->watchers = NULL;
33 fut->nwatchers = 0;
34 fut->maxwatchers = 0;
35}
36
37
38Future *Future_New(){
39 Future *fut = malloc(sizeof(Future));
40 Future_ctor(fut);
41 return fut;
42}
43
44
45void Future_dtor(
46 Future *fut
47){
48 int r;
49
50 r = pthread_mutex_lock(&fut->mutex);
51 assert(r == 0);
52 assert(fut->nwatchers == 0);
53 free(fut->watchers);
54 r = pthread_mutex_unlock(&fut->mutex);
55 assert(r == 0);
56
57 r = pthread_mutex_destroy(&fut->mutex);
58 assert(r == 0);
59}
60
61
62void Future_Delete(
63 Future *fut
64){
65 fut->vfptrs->dtor(fut);
66 free(fut);
67}
68
69
70void Future_AddWatcher(
71 Future *fut,
72 Future_Watcher watcher,
73 void *watcher_me
74){
75 int r = pthread_mutex_lock(&fut->mutex);
76 assert(r == 0);
77 if (fut->state == Future_State_Done) {
78 r = pthread_mutex_unlock(&fut->mutex);
79 assert(r == 0);
80 watcher(watcher_me, fut);
81 } else {
82 if (fut->nwatchers >= fut->maxwatchers) {
83 fut->maxwatchers = (fut->maxwatchers == 0) ? 4 : fut->maxwatchers * 2;
84 fut->watchers = realloc(fut->watchers, fut->maxwatchers * sizeof(Future_WatcherSpec));
85 assert(fut->watchers);
86 }
87 fut->watchers[fut->nwatchers].watcher = watcher;
88 fut->watchers[fut->nwatchers].me = watcher_me;
89 fut->nwatchers++;
90 r = pthread_mutex_unlock(&fut->mutex);
91 assert(r == 0);
92 }
93}
94
95
96void Future_RemoveWatcher(
97 Future *fut,
98 Future_Watcher watcher,
99 void *watcher_me
100){
101 int r = pthread_mutex_lock(&fut->mutex);
102 assert(r == 0);
103 for (int i = 0; i < fut->nwatchers; i++) {
104 if (fut->watchers[i].watcher == watcher && fut->watchers[i].me == watcher_me) {
105 fut->nwatchers--;
106 if (i < fut->nwatchers) {
107 fut->watchers[i] = fut->watchers[fut->nwatchers];
108 }
109 break;
110 }
111 }
112 r = pthread_mutex_unlock(&fut->mutex);
113 assert(r == 0);
114}
115
116
117static void _Future_Ready(
118 Future *fut
119){
120 int r;
121 fut->state = Future_State_Done;
122
123 // Take note of watchers list, and reset it
124 Future_WatcherSpec *watchers = fut->watchers;
125 int nwatchers = fut->nwatchers;
126 fut->watchers = NULL;
127 fut->nwatchers = 0;
128 r = pthread_mutex_unlock(&fut->mutex);
129 assert(r == 0);
130
131 // notify those watchers
132 for (int i = 0; i < nwatchers; i++) {
133 watchers[i].watcher(watchers[i].me, fut);
134 }
135 free(watchers);
136}
137
138
139void Future_SetResult(
140 Future *fut,
141 bool canceled,
142 void *value
143){
144 fut->vfptrs->set_result(fut, canceled, value);
145}
146
147
148bool Future_GetResult(
149 Future *fut,
150 void **res
151){
152 assert(fut->state == Future_State_Done);
153 if (res){
154 *res = fut->value;
155 }
156 return fut->canceled;
157}
158
159
160typedef struct future_bits {
161 Future *fut;
162 Coroutine *cor;
163} future_bits;
164
165
166static void future_complete(
167 void *me,
168 Future *fut
169){
170 (void)fut;
171 Coroutine *cor = (Coroutine *)me;
172 Coroutine_Continue(cor, NULL, false);
173}
174
175
176static void on_yield_for_future(
177 void *me
178){
179 future_bits *bits = (future_bits *)me;
180 Future_AddWatcher(bits->fut, future_complete, bits->cor);
181}
182
183
184void _Future_Await(
185 Future *fut
186){
187 future_bits bits;
188 bits.fut = fut;
189 bits.cor = Coroutine_GetActive();
190 Task *my_task = current_task;
191 current_task = NULL;
192 my_task->awaiting_future = fut;
193 if (my_task->canceled){
194 Future_SetResult(fut, true, my_task->cancel_value);
195 }
196 Coroutine_Yield(NULL, on_yield_for_future, &bits);
197 my_task->awaiting_future = NULL;
198 current_task = my_task;
199}
200
201
202void _Future_SetResult(
203 Future *fut,
204 bool canceled,
205 void *res
206){
207 int r;
208 r = pthread_mutex_lock(&fut->mutex);
209 assert(r == 0);
210 if(fut->state == Future_State_Waiting){
211 fut->canceled = canceled;
212 fut->value = res;
213 _Future_Ready(fut);
214 } else {
215 r = pthread_mutex_unlock(&fut->mutex);
216 assert(r == 0);
217 }
218}
219
220
221bool Future_Await(
222 Future *fut,
223 void **res
224){
225 fut->vfptrs->await(fut);
226 if (res){
227 *res = fut->value;
228 }
229 return fut->canceled;
230}
231