blob: be65bcbc164417e1ccb5d83ae3199135a04b117c [file] [log] [blame]
// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
#include "runtime.h"
static int32 debug = 0;
static Lock chanlock;
enum
{
Wclosed = 0x0001, // writer has closed
Rclosed = 0x0002, // reader has seen close
Eincr = 0x0004, // increment errors
Emax = 0x0800, // error limit before throw
};
typedef struct Hchan Hchan;
typedef struct Link Link;
typedef struct WaitQ WaitQ;
typedef struct SudoG SudoG;
typedef struct Select Select;
typedef struct Scase Scase;
struct SudoG
{
G* g; // g and selgen constitute
int32 selgen; // a weak pointer to g
int16 offset; // offset of case number
int8 isfree; // offset of case number
SudoG* link;
byte elem[8]; // synch data element (+ more)
};
struct WaitQ
{
SudoG* first;
SudoG* last;
};
struct Hchan
{
uint16 elemsize;
uint16 closed; // Wclosed Rclosed errorcount
uint32 dataqsiz; // size of the circular q
uint32 qcount; // total data in the q
Alg* elemalg; // interface for element type
Link* senddataq; // pointer for sender
Link* recvdataq; // pointer for receiver
WaitQ recvq; // list of recv waiters
WaitQ sendq; // list of send waiters
SudoG* free; // freelist
};
struct Link
{
Link* link; // asynch queue circular linked list
byte elem[8]; // asynch queue data element (+ more)
};
struct Scase
{
Hchan* chan; // chan
byte* pc; // return pc
uint16 send; // 0-recv 1-send 2-default
uint16 so; // vararg of selected bool
union {
byte elem[8]; // element (send)
byte* elemp; // pointer to element (recv)
} u;
};
struct Select
{
uint16 tcase; // total count of scase[]
uint16 ncase; // currently filled scase[]
Select* link; // for freelist
Scase* scase[1]; // one per case
};
static Select* selfree[20];
static SudoG* dequeue(WaitQ*, Hchan*);
static void enqueue(WaitQ*, SudoG*);
static SudoG* allocsg(Hchan*);
static void freesg(Hchan*, SudoG*);
static uint32 gcd(uint32, uint32);
static uint32 fastrand1(void);
static uint32 fastrand2(void);
// newchan(elemsize uint32, elemalg uint32, hint uint32) (hchan *chan any);
void
sys·newchan(uint32 elemsize, uint32 elemalg, uint32 hint,
Hchan* ret)
{
Hchan *c;
int32 i;
if(elemalg >= nelem(algarray)) {
printf("chan(alg=%d)\n", elemalg);
throw("sys·newchan: unsupported elem type");
}
c = mal(sizeof(*c));
c->elemsize = elemsize;
c->elemalg = &algarray[elemalg];
if(hint > 0) {
Link *d, *b, *e;
// make a circular q
b = nil;
e = nil;
for(i=0; i<hint; i++) {
d = mal(sizeof(*d) + c->elemsize - sizeof(d->elem));
if(e == nil)
e = d;
d->link = b;
b = d;
}
e->link = b;
c->recvdataq = b;
c->senddataq = b;
c->qcount = 0;
c->dataqsiz = hint;
}
ret = c;
FLUSH(&ret);
if(debug) {
prints("newchan: chan=");
sys·printpointer(c);
prints("; elemsize=");
sys·printint(elemsize);
prints("; elemalg=");
sys·printint(elemalg);
prints("; dataqsiz=");
sys·printint(c->dataqsiz);
prints("\n");
}
}
static void
incerr(Hchan* c)
{
c->closed += Eincr;
if(c->closed & Emax) {
unlock(&chanlock);
throw("too many operations on a closed channel");
}
}
/*
* generic single channel send/recv
* if the bool pointer is nil,
* then the full exchange will
* occur. if pres is not nil,
* then the protocol will not
* sleep but return if it could
* not complete
*/
void
sendchan(Hchan *c, byte *ep, bool *pres)
{
SudoG *sg;
G* gp;
if(debug) {
prints("chansend: chan=");
sys·printpointer(c);
prints("; elem=");
c->elemalg->print(c->elemsize, ep);
prints("\n");
}
lock(&chanlock);
loop:
if(c->closed & Wclosed)
goto closed;
if(c->dataqsiz > 0)
goto asynch;
sg = dequeue(&c->recvq, c);
if(sg != nil) {
if(ep != nil)
c->elemalg->copy(c->elemsize, sg->elem, ep);
gp = sg->g;
gp->param = sg;
unlock(&chanlock);
ready(gp);
if(pres != nil)
*pres = true;
return;
}
if(pres != nil) {
unlock(&chanlock);
*pres = false;
return;
}
sg = allocsg(c);
if(ep != nil)
c->elemalg->copy(c->elemsize, sg->elem, ep);
g->param = nil;
g->status = Gwaiting;
enqueue(&c->sendq, sg);
unlock(&chanlock);
gosched();
lock(&chanlock);
sg = g->param;
if(sg == nil)
goto loop;
freesg(c, sg);
unlock(&chanlock);
if(pres != nil)
*pres = true;
return;
asynch:
if(c->closed & Wclosed)
goto closed;
if(c->qcount >= c->dataqsiz) {
if(pres != nil) {
unlock(&chanlock);
*pres = false;
return;
}
sg = allocsg(c);
g->status = Gwaiting;
enqueue(&c->sendq, sg);
unlock(&chanlock);
gosched();
lock(&chanlock);
goto asynch;
}
if(ep != nil)
c->elemalg->copy(c->elemsize, c->senddataq->elem, ep);
c->senddataq = c->senddataq->link;
c->qcount++;
sg = dequeue(&c->recvq, c);
if(sg != nil) {
gp = sg->g;
freesg(c, sg);
unlock(&chanlock);
ready(gp);
} else
unlock(&chanlock);
if(pres != nil)
*pres = true;
return;
closed:
incerr(c);
if(pres != nil)
*pres = true;
unlock(&chanlock);
}
static void
chanrecv(Hchan* c, byte *ep, bool* pres)
{
SudoG *sg;
G *gp;
if(debug) {
prints("chanrecv: chan=");
sys·printpointer(c);
prints("\n");
}
lock(&chanlock);
loop:
if(c->dataqsiz > 0)
goto asynch;
if(c->closed & Wclosed)
goto closed;
sg = dequeue(&c->sendq, c);
if(sg != nil) {
c->elemalg->copy(c->elemsize, ep, sg->elem);
gp = sg->g;
gp->param = sg;
unlock(&chanlock);
ready(gp);
if(pres != nil)
*pres = true;
return;
}
if(pres != nil) {
unlock(&chanlock);
*pres = false;
return;
}
sg = allocsg(c);
g->param = nil;
g->status = Gwaiting;
enqueue(&c->recvq, sg);
unlock(&chanlock);
gosched();
lock(&chanlock);
sg = g->param;
if(sg == nil)
goto loop;
c->elemalg->copy(c->elemsize, ep, sg->elem);
freesg(c, sg);
unlock(&chanlock);
if(pres != nil)
*pres = true;
return;
asynch:
if(c->qcount <= 0) {
if(c->closed & Wclosed)
goto closed;
if(pres != nil) {
unlock(&chanlock);
*pres = false;
return;
}
sg = allocsg(c);
g->status = Gwaiting;
enqueue(&c->recvq, sg);
unlock(&chanlock);
gosched();
lock(&chanlock);
goto asynch;
}
c->elemalg->copy(c->elemsize, ep, c->recvdataq->elem);
c->recvdataq = c->recvdataq->link;
c->qcount--;
sg = dequeue(&c->sendq, c);
if(sg != nil) {
gp = sg->g;
freesg(c, sg);
unlock(&chanlock);
ready(gp);
if(pres != nil)
*pres = true;
return;
}
unlock(&chanlock);
if(pres != nil)
*pres = true;
return;
closed:
c->elemalg->copy(c->elemsize, ep, nil);
c->closed |= Rclosed;
incerr(c);
if(pres != nil)
*pres = true;
unlock(&chanlock);
}
// chansend1(hchan *chan any, elem any);
void
sys·chansend1(Hchan* c, ...)
{
int32 o;
byte *ae;
o = rnd(sizeof(c), c->elemsize);
ae = (byte*)&c + o;
sendchan(c, ae, nil);
}
// chansend2(hchan *chan any, elem any) (pres bool);
void
sys·chansend2(Hchan* c, ...)
{
int32 o;
byte *ae, *ap;
o = rnd(sizeof(c), c->elemsize);
ae = (byte*)&c + o;
o = rnd(o+c->elemsize, 1);
ap = (byte*)&c + o;
sendchan(c, ae, ap);
}
// chanrecv1(hchan *chan any) (elem any);
void
sys·chanrecv1(Hchan* c, ...)
{
int32 o;
byte *ae;
o = rnd(sizeof(c), c->elemsize);
ae = (byte*)&c + o;
chanrecv(c, ae, nil);
}
// chanrecv2(hchan *chan any) (elem any, pres bool);
void
sys·chanrecv2(Hchan* c, ...)
{
int32 o;
byte *ae, *ap;
o = rnd(sizeof(c), c->elemsize);
ae = (byte*)&c + o;
o = rnd(o+c->elemsize, 1);
ap = (byte*)&c + o;
chanrecv(c, ae, ap);
}
// chanrecv3(hchan *chan any, elem *any) (pres bool);
void
sys·chanrecv3(Hchan* c, byte* ep, byte pres)
{
chanrecv(c, ep, &pres);
}
// newselect(size uint32) (sel *byte);
void
sys·newselect(int32 size, Select *sel)
{
int32 n;
n = 0;
if(size > 1)
n = size-1;
lock(&chanlock);
sel = nil;
if(size >= 1 && size < nelem(selfree)) {
sel = selfree[size];
if(sel != nil)
selfree[size] = sel->link;
}
unlock(&chanlock);
if(sel == nil)
sel = mal(sizeof(*sel) + n*sizeof(sel->scase[0]));
sel->tcase = size;
sel->ncase = 0;
FLUSH(&sel);
if(debug) {
prints("newselect s=");
sys·printpointer(sel);
prints(" size=");
sys·printint(size);
prints("\n");
}
}
// selectsend(sel *byte, hchan *chan any, elem any) (selected bool);
void
sys·selectsend(Select *sel, Hchan *c, ...)
{
int32 i, eo;
Scase *cas;
byte *ae;
// nil cases do not compete
if(c == nil)
return;
i = sel->ncase;
if(i >= sel->tcase)
throw("selectsend: too many cases");
sel->ncase = i+1;
cas = sel->scase[i];
if(cas == nil) {
cas = mal(sizeof *cas + c->elemsize - sizeof(cas->u.elem));
sel->scase[i] = cas;
}
cas->pc = sys·getcallerpc(&sel);
cas->chan = c;
eo = rnd(sizeof(sel), sizeof(c));
eo = rnd(eo+sizeof(c), c->elemsize);
cas->so = rnd(eo+c->elemsize, 1);
cas->send = 1;
ae = (byte*)&sel + eo;
c->elemalg->copy(c->elemsize, cas->u.elem, ae);
if(debug) {
prints("selectsend s=");
sys·printpointer(sel);
prints(" pc=");
sys·printpointer(cas->pc);
prints(" chan=");
sys·printpointer(cas->chan);
prints(" po=");
sys·printint(cas->so);
prints(" send=");
sys·printint(cas->send);
prints("\n");
}
}
// selectrecv(sel *byte, hchan *chan any, elem *any) (selected bool);
void
sys·selectrecv(Select *sel, Hchan *c, ...)
{
int32 i, eo;
Scase *cas;
// nil cases do not compete
if(c == nil)
return;
i = sel->ncase;
if(i >= sel->tcase)
throw("selectrecv: too many cases");
sel->ncase = i+1;
cas = sel->scase[i];
if(cas == nil) {
cas = mal(sizeof *cas);
sel->scase[i] = cas;
}
cas->pc = sys·getcallerpc(&sel);
cas->chan = c;
eo = rnd(sizeof(sel), sizeof(c));
eo = rnd(eo+sizeof(c), sizeof(byte*));
cas->so = rnd(eo+sizeof(byte*), 1);
cas->send = 0;
cas->u.elemp = *(byte**)((byte*)&sel + eo);
if(debug) {
prints("selectrecv s=");
sys·printpointer(sel);
prints(" pc=");
sys·printpointer(cas->pc);
prints(" chan=");
sys·printpointer(cas->chan);
prints(" so=");
sys·printint(cas->so);
prints(" send=");
sys·printint(cas->send);
prints("\n");
}
}
// selectdefaul(sel *byte) (selected bool);
void
sys·selectdefault(Select *sel, ...)
{
int32 i;
Scase *cas;
i = sel->ncase;
if(i >= sel->tcase)
throw("selectdefault: too many cases");
sel->ncase = i+1;
cas = sel->scase[i];
if(cas == nil) {
cas = mal(sizeof *cas);
sel->scase[i] = cas;
}
cas->pc = sys·getcallerpc(&sel);
cas->chan = nil;
cas->so = rnd(sizeof(sel), 1);
cas->send = 2;
cas->u.elemp = nil;
if(debug) {
prints("selectdefault s=");
sys·printpointer(sel);
prints(" pc=");
sys·printpointer(cas->pc);
prints(" so=");
sys·printint(cas->so);
prints(" send=");
sys·printint(cas->send);
prints("\n");
}
}
// selectgo(sel *byte);
void
sys·selectgo(Select *sel)
{
uint32 p, o, i;
Scase *cas, *dfl;
Hchan *c;
SudoG *sg;
G *gp;
byte *as;
if(debug) {
prints("selectgo: sel=");
sys·printpointer(sel);
prints("\n");
}
if(sel->ncase < 2) {
if(sel->ncase < 1)
throw("selectgo: no cases");
// make special case of one.
}
// select a (relative) prime
for(i=0;; i++) {
p = fastrand1();
if(gcd(p, sel->ncase) == 1)
break;
if(i > 1000) {
throw("selectgo: failed to select prime");
}
}
// select an initial offset
o = fastrand2();
p %= sel->ncase;
o %= sel->ncase;
lock(&chanlock);
loop:
// pass 1 - look for something already waiting
dfl = nil;
for(i=0; i<sel->ncase; i++) {
cas = sel->scase[o];
if(cas->send == 2) { // default
dfl = cas;
goto next1;
}
c = cas->chan;
if(c->dataqsiz > 0) {
if(cas->send) {
if(c->closed & Wclosed)
goto sclose;
if(c->qcount < c->dataqsiz)
goto asyns;
goto next1;
}
if(c->qcount > 0)
goto asynr;
if(c->closed & Wclosed)
goto rclose;
goto next1;
}
if(cas->send) {
if(c->closed & Wclosed)
goto sclose;
sg = dequeue(&c->recvq, c);
if(sg != nil)
goto gots;
goto next1;
}
sg = dequeue(&c->sendq, c);
if(sg != nil)
goto gotr;
if(c->closed & Wclosed)
goto rclose;
next1:
o += p;
if(o >= sel->ncase)
o -= sel->ncase;
}
if(dfl != nil) {
cas = dfl;
goto retc;
}
// pass 2 - enqueue on all chans
for(i=0; i<sel->ncase; i++) {
cas = sel->scase[o];
c = cas->chan;
if(c->dataqsiz > 0) {
if(cas->send) {
if(c->qcount < c->dataqsiz) {
prints("selectgo: pass 2 async send\n");
goto asyns;
}
sg = allocsg(c);
sg->offset = o;
enqueue(&c->sendq, sg);
goto next2;
}
if(c->qcount > 0) {
prints("selectgo: pass 2 async recv\n");
goto asynr;
}
sg = allocsg(c);
sg->offset = o;
enqueue(&c->recvq, sg);
goto next2;
}
if(cas->send) {
sg = dequeue(&c->recvq, c);
if(sg != nil) {
prints("selectgo: pass 2 sync send\n");
g->selgen++;
goto gots;
}
sg = allocsg(c);
sg->offset = o;
c->elemalg->copy(c->elemsize, sg->elem, cas->u.elem);
enqueue(&c->sendq, sg);
goto next2;
}
sg = dequeue(&c->sendq, c);
if(sg != nil) {
prints("selectgo: pass 2 sync recv\n");
g->selgen++;
goto gotr;
}
sg = allocsg(c);
sg->offset = o;
enqueue(&c->recvq, sg);
next2:
o += p;
if(o >= sel->ncase)
o -= sel->ncase;
}
g->param = nil;
g->status = Gwaiting;
unlock(&chanlock);
gosched();
lock(&chanlock);
sg = g->param;
if(sg == nil)
goto loop;
o = sg->offset;
cas = sel->scase[o];
c = cas->chan;
if(c->dataqsiz > 0) {
// prints("shouldnt happen\n");
goto loop;
}
if(debug) {
prints("wait-return: sel=");
sys·printpointer(sel);
prints(" c=");
sys·printpointer(c);
prints(" cas=");
sys·printpointer(cas);
prints(" send=");
sys·printint(cas->send);
prints(" o=");
sys·printint(o);
prints("\n");
}
if(!cas->send) {
if(cas->u.elemp != nil)
c->elemalg->copy(c->elemsize, cas->u.elemp, sg->elem);
}
freesg(c, sg);
goto retc;
asynr:
if(cas->u.elemp != nil)
c->elemalg->copy(c->elemsize, cas->u.elemp, c->recvdataq->elem);
c->recvdataq = c->recvdataq->link;
c->qcount--;
sg = dequeue(&c->sendq, c);
if(sg != nil) {
gp = sg->g;
freesg(c, sg);
ready(gp);
}
goto retc;
asyns:
if(cas->u.elem != nil)
c->elemalg->copy(c->elemsize, c->senddataq->elem, cas->u.elem);
c->senddataq = c->senddataq->link;
c->qcount++;
sg = dequeue(&c->recvq, c);
if(sg != nil) {
gp = sg->g;
freesg(c, sg);
ready(gp);
}
goto retc;
gotr:
// recv path to wakeup the sender (sg)
if(debug) {
prints("gotr: sel=");
sys·printpointer(sel);
prints(" c=");
sys·printpointer(c);
prints(" o=");
sys·printint(o);
prints("\n");
}
if(cas->u.elemp != nil)
c->elemalg->copy(c->elemsize, cas->u.elemp, sg->elem);
gp = sg->g;
gp->param = sg;
ready(gp);
goto retc;
rclose:
if(cas->u.elemp != nil)
c->elemalg->copy(c->elemsize, cas->u.elemp, nil);
c->closed |= Rclosed;
incerr(c);
goto retc;
gots:
// send path to wakeup the receiver (sg)
if(debug) {
prints("gots: sel=");
sys·printpointer(sel);
prints(" c=");
sys·printpointer(c);
prints(" o=");
sys·printint(o);
prints("\n");
}
if(c->closed & Wclosed)
goto sclose;
c->elemalg->copy(c->elemsize, sg->elem, cas->u.elem);
gp = sg->g;
gp->param = sg;
ready(gp);
goto retc;
sclose:
incerr(c);
goto retc;
retc:
if(sel->ncase >= 1 && sel->ncase < nelem(selfree)) {
sel->link = selfree[sel->ncase];
selfree[sel->ncase] = sel;
}
unlock(&chanlock);
sys·setcallerpc(&sel, cas->pc);
as = (byte*)&sel + cas->so;
*as = true;
}
// closechan(sel *byte);
void
sys·closechan(Hchan *c)
{
SudoG *sg;
G* gp;
lock(&chanlock);
incerr(c);
c->closed |= Wclosed;
// release all readers
for(;;) {
sg = dequeue(&c->recvq, c);
if(sg == nil)
break;
gp = sg->g;
gp->param = nil;
freesg(c, sg);
ready(gp);
}
// release all writers
for(;;) {
sg = dequeue(&c->sendq, c);
if(sg == nil)
break;
gp = sg->g;
gp->param = nil;
freesg(c, sg);
ready(gp);
}
unlock(&chanlock);
}
// closedchan(sel *byte) bool;
void
sys·closedchan(Hchan *c, bool closed)
{
// test Rclosed
closed = 0;
if(c->closed & Rclosed)
closed = 1;
FLUSH(&closed);
}
static SudoG*
dequeue(WaitQ *q, Hchan *c)
{
SudoG *sgp;
loop:
sgp = q->first;
if(sgp == nil)
return nil;
q->first = sgp->link;
// if sgp is stale, ignore it
if(sgp->selgen != sgp->g->selgen) {
//prints("INVALID PSEUDOG POINTER\n");
freesg(c, sgp);
goto loop;
}
// invalidate any others
sgp->g->selgen++;
return sgp;
}
static void
enqueue(WaitQ *q, SudoG *sgp)
{
sgp->link = nil;
if(q->first == nil) {
q->first = sgp;
q->last = sgp;
return;
}
q->last->link = sgp;
q->last = sgp;
}
static SudoG*
allocsg(Hchan *c)
{
SudoG* sg;
sg = c->free;
if(sg != nil) {
c->free = sg->link;
} else
sg = mal(sizeof(*sg) + c->elemsize - sizeof(sg->elem));
sg->selgen = g->selgen;
sg->g = g;
sg->offset = 0;
sg->isfree = 0;
return sg;
}
static void
freesg(Hchan *c, SudoG *sg)
{
if(sg != nil) {
if(sg->isfree)
throw("chan.freesg: already free");
sg->isfree = 1;
sg->link = c->free;
c->free = sg;
}
}
static uint32
gcd(uint32 u, uint32 v)
{
for(;;) {
if(u > v) {
if(v == 0)
return u;
u = u%v;
continue;
}
if(u == 0)
return v;
v = v%u;
}
}
static uint32
fastrand1(void)
{
static uint32 x = 0x49f6428aUL;
x += x;
if(x & 0x80000000L)
x ^= 0x88888eefUL;
return x;
}
static uint32
fastrand2(void)
{
static uint32 x = 0x49f6428aUL;
x += x;
if(x & 0x80000000L)
x ^= 0xfafd871bUL;
return x;
}