/*
* rotor.d
* Do a fair-across-key rotor data structure
*
* This is motivated by a web server, where multiple requests stacked
* up from a single IP address will be interleaved with requests
* from others.
*/
module tiny.rotor;
import std.exception : enforce;
import tiny.fifo : FIFO;
// Somebody has this bit of work waiting.
interface WorkItem {
abstract void run();
}
// Who gave us work? Their identity is represented by
// an array of bytes (in our initial application, their
// IP address).
// We assume uniformity of address format for now. TBD is
// interleaved v4 with v6, for instance.
class Submitter {
private:
const ubyte[] address;
size_t ahash;
public:
@safe:
this(const ubyte[] addr) {
this.address = addr;
// Hash; xor in four bytes worth
size_t res = 0;
foreach(idx,b; this.address) {
res ^= cast(size_t)b << (8 * (idx & 3));
}
this.ahash = res;
}
pure:
// Hash based on address
override size_t
toHash() {
return this.ahash;
}
// Same address?
bool
opEquals(in Submitter s) {
assert(this.address.length == s.address.length);
foreach(idx,v; this.address) {
if (s.address[idx] != v) {
return false;
}
}
return true;
}
// Ordering
int
opCmp(in Submitter s) {
assert(this.address.length == s.address.length);
foreach(idx,v; this.address) {
ubyte b = s.address[idx];
if (v == b) {
continue;
}
if (v < b) {
return(-1);
}
return(1);
}
return 0;
}
}
unittest {
ubyte[] b1 = [1, 2, 3, 4], b2 = [1, 2, 5, 4];
auto s1 = new Submitter(b1);
auto s1a = new Submitter(b1);
auto s2 = new Submitter(b2);
assert(s1.opEquals(s1));
assert(s1.opEquals(s1a));
assert(s1 == s1);
assert(s1 == s1a);
assert(s1.opCmp(s2) < 0);
assert(s1 < s2);
}
// WorkItem list for a given Submitter
class WorkItems {
private:
Submitter sub;
FIFO!WorkItem work;
@safe:
this(Submitter sub) {
this.sub = sub;
this.work = new FIFO!WorkItem();
}
public:
bool
empty() {
return this.work.empty();
}
void
add(WorkItem w) {
this.work.add(w);
}
WorkItem
next() {
return this.work.next();
}
}
// Organize incoming work and schedule "fairly"
class Rotor {
private:
// Each submitter, taking their turns
FIFO!WorkItems submitters;
// Map from Submitter to their WorkItems
WorkItems[Submitter] active;
public: @safe:
this() {
this.submitters = new FIFO!WorkItems();
}
// Add work from this Submitter
void
add(Submitter sub, WorkItem w)
{
// They already have work waiting, join the list
auto ws = this.active.get(sub, null);
if (ws is null) {
ws = new WorkItems(sub);
this.active[sub] = ws;
this.submitters.add(ws);
}
ws.add(w);
}
// Hand back next bit of work
WorkItem
next()
{
enforce(!this.submitters.empty(), "Rotor.next() empty");
auto ws = this.submitters.next();
auto w = ws.next();
if (!ws.empty()) {
this.submitters.add(ws);
} else {
this.active.remove(ws.sub);
}
return w;
}
// Anything left to run?
bool
empty() {
return this.submitters.empty();
}
}
unittest {
auto r = new Rotor();
const ubyte[] b1 = [1, 2, 3, 4], b2 = [1, 2, 5, 4];
auto s1 = new Submitter(b1);
auto s2 = new Submitter(b2);
string[] history;
class MyWorkItem : WorkItem {
string nm;
this(string nm) {
this.nm = nm;
}
void
run() {
history ~= this.nm;
}
string
name() {
return this.nm;
}
}
auto w11 = new MyWorkItem("w11");
auto w12 = new MyWorkItem("w12");
auto w21 = new MyWorkItem("w21");
auto w22 = new MyWorkItem("w22");
auto w23 = new MyWorkItem("w23");
r.add(s2, w21);
r.add(s2, w22);
r.add(s2, w23);
r.add(s1, w11);
r.add(s1, w12);
foreach(_x; 0 .. 5) {
auto wi = r.next();
wi.run();
}
assert(history == ["w21", "w11", "w22", "w12", "w23"]);
assert(r.empty());
assert(r.submitters.empty());
assert(r.active.length == 0);
}
version(unittest) {
void
main() {
}
}