225 lines5.3 KB
1#include "asleep.h"
2#include "task.h"
3#include <assert.h>
4#include <stdio.h>
5#include <stdlib.h>
6#include <errno.h>
7#include <math.h>
8#include <string.h>
9#include "cor_platform.h"
10
11
12typedef struct Loop_Timer {
13 int64_t when;
14 Future *who;
15} Loop_Timer;
16
17typedef struct _ASleep {
18 _Cor_Mutex mutex;
19 unsigned users;
20 Loop_Timer *sleepers;
21 int lo;
22 int hi;
23 int max;
24 _Cor_Semaphore sem;
25 bool sleeploop_cancel;
26 _Cor_Thread sleeploop;
27 bool started;
28} _ASleep;
29
30static _ASleep asleep;
31
32static void *ASleep_Sleeper(void *param);
33
34static void ASleep_ctor(
35 _ASleep *me
36){
37 me->sleepers = NULL;
38 me->lo = 0;
39 me->hi = 0;
40 me->max = 0;
41
42 _Cor_Mutex_ctor(&me->mutex);
43 _Cor_Semaphore_ctor(&me->sem);
44 me->sleeploop_cancel = false;
45 _Cor_Thread_ctor(&me->sleeploop, ASleep_Sleeper, me);
46 me->started = true;
47}
48
49
50
51static void ASleep_dtor(
52 _ASleep *me
53){
54 me->started = false;
55 me->sleeploop_cancel = true;
56 _Cor_Semaphore_Signal(&me->sem);
57 _Cor_Thread_Join(&me->sleeploop);
58 _Cor_Thread_dtor(&me->sleeploop);
59 _Cor_Sempahore_dtor(&me->sem);
60 _Cor_Mutex_dtor(&me->mutex);
61 free(me->sleepers);
62}
63
64
65void ASleep_StartSystem(){
66 ASleep_ctor(&asleep);
67}
68
69
70void ASleep_StopSystem(){
71 ASleep_dtor(&asleep);
72}
73
74
75static void ASleep_AddSleeper(
76 _ASleep *me,
77 int64_t when,
78 Future *who
79){
80 _Cor_Mutex_Lock(&me->mutex);
81
82 int nsleepers;
83 if (me->lo <= me->hi){
84 int nsleepers = me->hi - me->lo;
85 // ensure there's always an empty slot so that lo == hi always means no sleepers
86 if (nsleepers+1 >= me->max) {
87 me->max = (me->max == 0) ? 4 : me->max * 2;
88 me->sleepers = realloc(me->sleepers, me->max * sizeof(Loop_Timer));
89 assert(me->sleepers);
90 }
91 } else {
92 nsleepers = me->hi + me->max - me->lo;
93 // ensure there's always an empty slot so that lo == hi always means no sleepers
94 if (nsleepers+1 >= me->max) {
95 int oldmax = me->max;
96 me->max = (me->max == 0) ? 4 : me->max * 2;
97 me->sleepers = realloc(me->sleepers, me->max * sizeof(Loop_Timer));
98 assert(me->sleepers);
99 memmove(&me->sleepers[me->lo], &me->lo + me->max - oldmax, sizeof(*me->sleepers) * oldmax - me->lo);
100 me->lo += me->max - oldmax;
101 }
102 }
103
104 // insertion sort - we're assuming there's not going to be many sleepers active at once
105 int i;
106 int nexti;
107 for (i = me->hi; i != me->lo; i = nexti){
108 nexti = i-1;
109 if (i < 0){
110 nexti += me->max;
111 }
112 if (me->sleepers[nexti].when <= when){
113 break;
114 }
115 me->sleepers[i] = me->sleepers[nexti];
116 }
117 me->sleepers[i].when = when;
118 me->sleepers[i].who = who;
119 me->hi += 1;
120 if (me->hi >= me->max){
121 me->hi -= me->max;
122 }
123
124 _Cor_Mutex_Unlock(&me->mutex);
125 _Cor_Semaphore_Signal(&me->sem);
126}
127
128
129static void ASleep_RemoveSleeper(
130 _ASleep *me,
131 int64_t when,
132 Future *who
133){
134 _Cor_Mutex_Lock(&me->mutex);
135
136 int i;
137 for (i = me->lo; i != me->hi; ){
138 if (me->sleepers[i].when == when && me->sleepers[i].who == who){
139 int previ = i;
140 i += 1;
141 if (i >= me->max){
142 i -= me->max;
143 }
144 for (; i != me->hi; ){
145 me->sleepers[previ] = me->sleepers[i];
146 previ = i;
147 i += 1;
148 if (i >= me->max){
149 i -= me->max;
150 }
151 }
152 me->hi = previ;
153 break;
154 }
155 i += 1;
156 if (i >= me->max){
157 i -= me->max;
158 }
159 }
160
161 _Cor_Mutex_Unlock(&me->mutex);
162 _Cor_Semaphore_Signal(&me->sem);
163}
164
165
166static void *ASleep_Sleeper(
167 void *param
168){
169 _ASleep *me = (_ASleep *)param;
170
171 while (!me->sleeploop_cancel){
172 _Cor_Mutex_Lock(&me->mutex);
173 bool got_one;
174 if (me->lo != me->hi){
175 int64_t when = me->sleepers[0].when;
176 _Cor_Mutex_Unlock(&me->mutex);
177 got_one = _Cor_Semaphore_Wait(&me->sem, when);
178 } else {
179 _Cor_Mutex_Unlock(&me->mutex);
180 got_one = _Cor_Semaphore_Wait(&me->sem, -1);
181 }
182 if (!got_one){
183 // timed out, so...
184 // issue any due timers
185 int64_t now = _Cor_Realtime_Now();
186 while(me->lo != me->hi && me->sleepers[me->lo].when <= now) {
187 Future *fut = me->sleepers[me->lo].who;
188 me->lo += 1;
189 if (me->lo >= me->max){
190 me->lo -= me->max;
191 }
192 _Cor_Mutex_Unlock(&me->mutex);
193 Future_SetResult(fut, false, NULL);
194 _Cor_Mutex_Lock(&me->mutex);
195 now = _Cor_Realtime_Now();
196 }
197 _Cor_Mutex_Unlock(&me->mutex);
198 }
199 }
200
201 return NULL;
202}
203
204
205// value is a pointer to a float delay in seconds
206bool ASleep(
207 float delay,
208 void **value
209){
210 assert(current_task && asleep.started);
211 if (delay < 0){
212 delay = 0;
213 }
214 int64_t endtime = _Cor_Realtime_Now() + (int64_t)(delay * 1000000000);
215
216 Future fut;
217 Future_ctor(&fut);
218 ASleep_AddSleeper(&asleep, endtime, &fut);
219 bool res = Future_Await(&fut, value);
220 ASleep_RemoveSleeper(&asleep, endtime, &fut);
221 Future_dtor(&fut);
222
223 return res;
224}
225