Source code for TileCache.Swarm

# BSD Licensed, Copyright (c) 2006-2010 TileCache Contributors

import struct, time
from sha import sha
from bisect import bisect_left

from Layer import Layer, Tile
from Client import WMS
from Service import Service

[docs]class Message (object): types = ("PING", "PONG", "GET", "PUT", "DELETE") header = "!20sLHH" header_len = struct.calcsize(header) tilespec = "!31pHLL" tilespec_len = struct.calcsize(tilespec) def __init__ (self, msgtype, key, seq, tile = None): self.key = key self.seq_id = seq self.type = msgtype self.tile = tile def _thaw (classobj, msgtxt, layers): key, seq, msgtype, crc = struct.unpack( classobj.header, msgtxt[:classobj.header_len] ) msg = classobj( classobj.types[msgtype], key, seq ) msg.checksum = crc dispatch = getattr(msg, "thaw_" + msg.type) dispatch( msgtxt[classobj.header_len:], layers ) return msg thaw = classmethod(_thaw)
[docs] def thaw_PING (self, msgtxt, layers): pass
[docs] def thaw_PONG (self, msgtxt, layers): self.ping_id = struct.unpack("!L", msgtxt)
[docs] def thaw_GET (self, msgtxt, layers): layername, level, row, col = struct.unpack(self.tilespec, msgtxt) self.tile = Tile(layers[layername], row, col, level)
[docs] def thaw_PUT (self, msgtxt, layers): self.thaw_GET(msgtxt[:self.tilespec_len], layers) self.tile.data = msgtxt[self.tilespec_len:]
[docs] def thaw_DELETE (self, msgtxt, layers): layername, level, minrow, mincol, maxrow, maxcol = \ struct.unpack("!31pHLLLL", msgtxt) self.layer = layer[layername] self.level = level self.box = (minrow, mincol, maxrow, maxcol)
[docs] def freeze (self): msgtype, name = filter( lambda x: x[1] == self.type, enumerate(self.types) )[0] msgtxt = struct.pack( self.header, self.key, self.seq_id, msgtype, 0 ) dispatch = getattr(self, "freeze_" + self.type) msgtxt += dispatch() return msgtxt
[docs] def freeze_PING (self): return ""
[docs] def freeze_PONG (self): return struct.pack("!L", self.ping_id)
[docs] def freeze_GET (self): tile = self.tile return struct.pack(self.tilespec, tile.layer.name, tile.z, tile.x, tile.y)
[docs] def freeze_PUT (self): return self.freeze_GET() + self.tile.data
[docs] def freeze_DELETE (self): return struct.pack("!31pHLLLL", self.tile.layer.name, self.level, *self.box)
[docs]class Peer (object): min_timeout = 15 def __init__ (self, key = None, address = None, weight = 10.0): self.address = address self.key = key self.weight = float(weight) self.seq_id = 0L self.timeout = self.min_timeout
[docs]class Client (Peer): max_ring_inserts = 64 replication = 3 def __init__ (self, service = None, **kwargs): Peer.__init__(self, **kwargs) self.seq_id = long( time.time() ) self.ring = [] self.peers = {} self.requests = {} self.config = service self.server = None
[docs] def tile_key (self, tile): id = tile.layer + struct.pack("xHLL", tile.z, tile.x, tile.y) return sha(id).digest()
[docs] def message (self, type, tile = None): self.seq_id += 1 return Message(type, self.key, self.seq_id, tile)
[docs] def drop_timeout (self, peer): if peer.timeout: peer.timeout -= 1
[docs] def schedule_timeout (self, peer): self.schedule(1.0, self.drop_timeout, peer)
[docs] def set_put_callback (self, tile, callback): key = self.tile_key(tile) if key not in self.requests: self.requests[key] = [] self.requests[key].append(callback)
[docs] def trigger_put_callbacks (self, tile): key = self.tile_key(tile) if key not in self.requests: return for callback in self.requests[key]: callback(tile) del self.requests[key]
[docs] def send (self, peer, msg): raise NotImplementedError()
[docs] def schedule (self, when, callback, *args): raise NotImplementedError()
[docs] def load_peers (self): raise NotImplementedError() self.schedule(15.0, self.load_peers)
[docs] def load_peers_from_string(self, data): directory = [] found = {} for line in data.split("\n"): key, ip, port, weight = line.split(" ") directory.append(Peer(key, (ip, port), weight)) self.set_peers(directory)
[docs] def set_peers (self, peers): new_peers = dict(map(lambda p: (p.key, p), peers)) for key, peer in new_peers: if key in self.peers: # already have it continue else: self.peers[key] = peer # don't? then add it for key, peer in self.peers: if key not in new_peers: del self.peers[key] # don't need it anymore self.rebalance_peers()
[docs] def rebalance_peers (self): ring = [] peers = self.peers.values() total_weight = sum([p.weight for p in peers]) + 1.0 for peer in peers: normal_weight = peer.weight / total_weight * self.max_ring_inserts self.ring.append((peer.key, peer)) for i in range(1, int(normal_weight)): subkey = sha(peer.key + chr(i)).digest() self.ring.append((subkey, peer)) ring.sort() self.ring = ring
[docs] def select_peers (self, key, count = replication): if type(key) is Tile: key = self.tile_key(key) start = bisect_left(self.ring, (key,)) cursor = start selected = [] while len(selected) < count: peer = self.ring[cursor][1] if peer.timeout and peer.key != self.key: selected.append(peer) if cursor == len(self.ring): cursor = 0 else: cursor += 1 if cursor == start: break return selected
[docs] def send_GET (self, tile, callback = None): if callable(callback): self.set_put_callback(tile, callback) for target in self.select_peers(tile): msg = self.message("GET", tile) self.send(target, msg) self.schedule_timeout(target)
[docs] def send_PUT (self, tile): for target in self.select_peers(tile): msg = self.message("PUT", tile) self.send(target, msg)
[docs] def send_PING (self, peer): self.send( peer, self.message("PING") ) self.schedule_timeout(peer)
[docs] def send_PONG (self, ping): msg = self.message("PONG") msg.ping_id = ping.seq_id peer = self.peers[ping.key] self.send( peer, msg )
[docs] def handle (self, thunk, (host, port)): msg = Message.thaw(thunk, self.config.layers) peer = self.peers[msg.key] # validate originating host/port here peer.timeout = self.max_timeout dispatch = getattr(self, "handle_" + msg.type) dispatch(peer, msg)
[docs] def handle_GET (self, peer, msg): data = self.config.cache.get(msg.tile) if data: msg.tile.data = data reply = self.message("PUT", msg.tile) self.send( peer, reply ) else: self.send_PONG( msg )
[docs] def handle_PUT (self, peer, msg): self.config.cache.set(msg.tile, msg.tile.data) self.handle_put_callbacks(msg.tile)
[docs] def handle_PING (self, peer, msg): self.send_PONG(msg)
[docs] def handle_PONG (self, peer, msg): # already reset peer timeout, nothing to be done pass
[docs] def handle_DELETE (self, peer, msg): # not implemented yet pass