Unnamed Fossil Project

Artifact [26cac11b29]
Login

Artifact 26cac11b2969179cc24a195858aa9e2a711ac2912fd43c791d4b13ca546a5783:


/*
 * rotor.d
 *	Do a fair-across-key rotor data structure
 *
 * This is motivated by a web server, where multiple requests stacked
 *  up from a single IP address will be interleaved with requests
 *  from others.
 */
module tiny.rotor;

import std.exception : enforce;
import tiny.fifo : FIFO;
import std.stdio : writeln;

// Somebody has this bit of work waiting.
interface WorkItem {
    abstract void run();
}

// Who gave us work?  Their identity is represented by
//  an array of bytes (in our initial application, their
//  IP address).
// We assume uniformity of address format for now.  TBD is
//  interleaved v4 with v6, for instance.
class Submitter {
private:
    const ubyte[] address;
    size_t ahash;

public:
@safe:
    this(const ubyte[] addr) {
	this.address = addr;

	// Hash; xor in four bytes worth
	size_t res = 0;
	foreach(idx,b; this.address) {
	    res ^= cast(size_t)b << (8 * (idx & 3));
	}
	this.ahash = res;
    }

    // Hash based on address
    override size_t
    toHash() {
	return this.ahash;
    }

    // Same address?
    bool
    opEquals(in Submitter s) {
	writeln("opEquals");
	assert(this.address.length == s.address.length);
	foreach(idx,v; this.address) {
	    if (s.address[idx] != v) {
		return false;
	    }
	}
	return true;
    }

    // Ordering
    int
    opCmp(in Submitter s) {
	writeln("opCmp");
	assert(this.address.length == s.address.length);
	foreach(idx,v; this.address) {
	    ubyte b = s.address[idx];
	    if (v == b) {
		continue;
	    }
	    if (v < b) {
		return(-1);
	    }
	    return(1);
	}
	return 0;
    }
}
unittest {
    ubyte[] b1 = [1, 2, 3, 4], b2 = [1, 2, 5, 4];
    auto s1 = new Submitter(b1);
    auto s1a = new Submitter(b1);
    auto s2 = new Submitter(b2);
    assert(s1.opEquals(s1));
    assert(s1.opEquals(s1a));
    assert(s1 == s1);
    assert(s1 == s1a);
    assert(s1.opCmp(s2) < 0);
    assert(s1 < s2);
}

// WorkItem list for a given Submitter
class WorkItems {
private:
    Submitter sub;
    FIFO!WorkItem work;

@safe:
    this(Submitter sub) {
	this.sub = sub;
	this.work = new FIFO!WorkItem();
    }

public:
    bool
    empty() {
	return this.work.empty();
    }

    void
    add(WorkItem w) {
	this.work.add(w);
    }

    WorkItem
    next() {
	return this.work.next();
    }
}

// Organize incoming work and schedule "fairly"
class Rotor {

private:
    // Each submitter, taking their turns
    FIFO!WorkItems submitters;

    // Map from Submitter to their WorkItems
    WorkItems[Submitter] active;

public: @safe:
    this() {
	this.submitters = new FIFO!WorkItems();
    }

    // Add work from this Submitter
    void
    add(Submitter sub, WorkItem w)
    {
	// They already have work waiting, join the list
	auto ws = this.active.get(sub, null);
	if (ws is null) {
	    ws = new WorkItems(sub);
	    this.active[sub] = ws;
	    this.submitters.add(ws);
	}
	ws.add(w);
    }

    // Hand back next bit of work
    WorkItem
    next()
    {
	enforce(!this.submitters.empty(), "Rotor.next() empty");
	auto ws = this.submitters.next();
	auto w = ws.next();
	if (!ws.empty()) {
	    this.submitters.add(ws);
	} else {
	    this.active.remove(ws.sub);
	}
	return w;
    }

    // Anything left to run?
    bool
    empty() {
	return this.submitters.empty();
    }
}

unittest {
    auto r = new Rotor();
    const ubyte[] b1 = [1, 2, 3, 4], b2 = [1, 2, 5, 4];
    auto s1 = new Submitter(b1);
    auto s2 = new Submitter(b2);
    string[] history;
    class MyWorkItem : WorkItem {
	string nm;

	this(string nm) {
	    this.nm = nm;
	}
	void
	run() {
	    history ~= this.nm;
	}
	string
	name() {
	    return this.nm;
	}
    }
    auto w11 = new MyWorkItem("w11");
    auto w12 = new MyWorkItem("w12");
    auto w21 = new MyWorkItem("w21");
    auto w22 = new MyWorkItem("w22");
    auto w23 = new MyWorkItem("w23");
    r.add(s2, w21);
    r.add(s2, w22);
    r.add(s2, w23);
    r.add(s1, w11);
    r.add(s1, w12);
    foreach(_x; 0 .. 5) {
	auto wi = r.next();
	wi.run();
    }
    assert(history == ["w21", "w11", "w22", "w12", "w23"]);
    assert(r.empty());
    assert(r.submitters.empty());
    assert(r.active.length == 0);
}

version(unittest) {
    void
    main() {
    }
}