[go: nahoru, domu]

Skip to content

Commit

Permalink
datapaths: Add dpctl functionality to pcap_switch
Browse files Browse the repository at this point in the history
This actually makes a number of semi-related changes:
* Refactors port generation so that switch subclasses can alter behavior
* Adds a little datapaths.ctl framework and simple client
* Fixes some minor bugs in switch base class
* Makes switch base class call its own API functions
* Refactors port/interface initialization in pcap_switch
* Gives pcap_switch ability to add/remove ports while running
* Implements add-port/del-port/show dpctl commands
  • Loading branch information
MurphyMc committed Sep 29, 2013
1 parent aa78da9 commit 55a1301
Show file tree
Hide file tree
Showing 3 changed files with 330 additions and 60 deletions.
139 changes: 139 additions & 0 deletions pox/datapaths/ctl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
# Copyright 2013 James McCauley
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Simple datapath control framework for POX datapaths
"""

from pox.core import core
from pox.lib.ioworker.workers import *
from pox.lib.ioworker import *
from pox.lib.revent import *


# IOLoop for our IO workers
_ioloop = None

# Log
log = None


class CommandEvent (Event):
"""
Event fired whenever a command is received
"""
def __init__ (self, worker, cmd):
super(CommandEvent,self).__init__()
self.worker = worker
self.cmd = cmd

@property
def first (self):
return self.cmd.strip().split()[0]

@property
def args (self):
return self.cmd.strip().split()[1:]

def __str__ (self):
return "<%s: %s>" % (self.worker, self.cmd)


class ServerWorker (TCPServerWorker, RecocoIOWorker):
"""
Worker to accept connections
"""
pass
#TODO: Really should just add this to the ioworker package.


class Worker (RecocoIOWorker):
"""
Worker to receive POX dpctl commands
"""
def __init__ (self, *args, **kw):
super(Worker, self).__init__(*args, **kw)
self._connecting = True
self._buf = b''

def _process (self, data):
self._buf += data
while '\n' in self._buf:
fore,self._buf = self._buf.split('\n', 1)
core.ctld.raiseEventNoErrors(CommandEvent, self, fore)


def _handle_rx (self):
self._buf += self.read()
self._process(self.read())

def _exec (self, msg):
msg.split()


class Server (EventMixin):
"""
Listens on a TCP socket for control
"""
_eventMixin_events = set([CommandEvent])

def __init__ (self, port = 7791):
w = ServerWorker(child_worker_type=Worker, port = port)
self.server_worker = w
_ioloop.register_worker(w)


def create_server (port = 7791):
# Set up logging
global log
if not log:
log = core.getLogger()

# Set up IO loop
global _ioloop
if not _ioloop:
_ioloop = RecocoIOLoop()
#_ioloop.more_debugging = True
_ioloop.start()

c = Server(port = int(port))
return c


def server (port = 7791):
c = create_server(int(port))
core.register("ctld", c)


def launch (cmd, address = None, port = 7791):
core.quit()
if not address:
address = "127.0.0.1"
import socket
core.getLogger('core').setLevel(100)
log = core.getLogger('ctl')
try:
s = socket.create_connection((address,port), timeout=2)
except:
log.error("Couldn't connect")
return
try:
s.settimeout(2)
s.send(cmd + "\n")
d = s.recv(4096).strip()
core.getLogger("ctl").info(d)
except socket.timeout:
log.warn("No response")
except:
log.exception("While communicating")
198 changes: 159 additions & 39 deletions pox/datapaths/pcap_switch.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,49 +32,97 @@

log = core.getLogger()

DEFAULT_CTL_PORT = 7791

_switches = {}

def _do_ctl (event):
r = _do_ctl2(event)
if r is None:
r = "Okay."
event.worker.send(r + "\n")

def _do_ctl2 (event):
def errf (msg, *args):
raise RuntimeError(msg % args)

args = event.args

def ra (low, high = None):
if high is None: high = low
if len(args) < low or len(args) > high:
raise RuntimeError("Wrong number of arguments")
return False

try:
if event.first == "add-port":
ra(1,2)
if len(event.args) == 1 and len(_switches) == 1:
sw = _switches[_switches.keys()[0]]
p = args[0]
else:
ra(2)
if event.args[0] not in _switches:
raise RuntimeError("No such switch")
sw = _switches[event.args[0]]
p = args[1]
sw.add_interface(p, start=True, on_error=errf)
elif event.first == "del-port":
ra(1,2)
if len(event.args) == 1:
for sw in _switches.values():
for p in sw.ports:
if p.name == event.args[0]:
sw.remove_interface(event.args[0])
return
raise RuntimeError("No such interface")
sw = _switches[event.args[0]]
sw.remove_interface(args[1])
elif event.first == "show":
ra(0)
s = []
for sw in _switches.values():
s.append("Switch %s" % (sw.name,))
for no,p in sw.ports.iteritems():
s.append(" %3s %s" % (no, p.name))
return "\n".join(s)

else:
raise RuntimeError("Unknown command")

except Exception as e:
log.exception("While processing command")
return "Error: " + str(e)


def launch (address = '127.0.0.1', port = 6633, max_retry_delay = 16,
dpid = None, ports = '', extra = None, __INSTANCE__ = None):
dpid = None, ports = '', extra = None, ctl_port = None,
__INSTANCE__ = None):
"""
Launches a switch
"""

if not pxpcap.enabled:
raise RuntimeError("You need PXPCap to use this component")

_ports = ports
if ctl_port:
if core.hasComponent('ctld'):
raise RuntimeError("Only one ctl_port is allowed")

if ctl_port is True:
ctl_port = DEFAULT_CTL_PORT

import ctl
ctl.server(ctl_port)
core.ctld.addListenerByName("CommandEvent", _do_ctl)

_ports = ports.strip()
def up (event):
devs = pxpcap.PCap.get_devices()
ports = _ports.split(",")
phys = []
portnum = 1
if len(ports) == 1 and ports[0] == '': ports = []
for p in list(ports):
if p not in devs:
log.error("Device %s not available -- ignoring", p)
continue
dev = devs[p]
if dev.get('addrs',{}).get('ethernet',{}).get('addr') is None:
log.error("Device %s has no ethernet address -- ignoring", p)
continue
if dev.get('addrs',{}).get('AF_INET') != None:
log.error("Device %s has an IP address -- ignoring", p)
continue
phy = of.ofp_phy_port()
phy.port_no = portnum
portnum += 1
phy.hw_addr = dev['addrs']['ethernet']['addr']
phy.name = p
# Fill in features sort of arbitrarily
phy.curr = of.OFPPF_10MB_HD
phy.advertised = of.OFPPF_10MB_HD
phy.supported = of.OFPPF_10MB_HD
phy.peer = of.OFPPF_10MB_HD
phys.append(phy)

do_launch(PCapSwitch, address, port, max_retry_delay, dpid, ports=phys,
extra_args=extra)
ports = [p for p in _ports.split(",") if p]

sw = do_launch(PCapSwitch, address, port, max_retry_delay, dpid,
ports=ports, extra_args=extra)
_switches[sw.name] = sw

core.addListenerByName("UpEvent", up)

Expand All @@ -83,25 +131,31 @@ class PCapSwitch (ExpireMixin, SoftwareSwitchBase):
# Default level for loggers of this class
default_log_level = logging.INFO

def __init__ (self, *args, **kw):
def __init__ (self, **kw):
"""
Create a switch instance
Additional options over superclass:
log_level (default to default_log_level) is level for this instance
ports is a list of interface names
"""
log_level = kw.pop('log_level', self.default_log_level)

self.q = Queue()
self.t = Thread(target=self._consumer_threadproc)
core.addListeners(self)

super(PCapSwitch,self).__init__(*args,**kw)
ports = kw.pop('ports', [])
kw['ports'] = []

super(PCapSwitch,self).__init__(**kw)

self._next_port = 1

self.px = {}
for p in self.ports.values():
px = pxpcap.PCap(p.name, callback = self._pcap_rx, start = False)
px.port_no = p.port_no
self.px[p.port_no] = px

for p in ports:
self.add_interface(p, start=False)

self.log.setLevel(log_level)

Expand All @@ -110,6 +164,69 @@ def __init__ (self, *args, **kw):

self.t.start()

def add_interface (self, name, port_no=-1, on_error=None, start=False):
if on_error is None:
on_error = log.error

devs = pxpcap.PCap.get_devices()
if name not in devs:
on_error("Device %s not available -- ignoring", name)
return
dev = devs[name]
if dev.get('addrs',{}).get('ethernet',{}).get('addr') is None:
on_error("Device %s has no ethernet address -- ignoring", name)
return
if dev.get('addrs',{}).get('AF_INET') != None:
on_error("Device %s has an IP address -- ignoring", name)
return
for no,p in self.px.iteritems():
if p.device == name:
on_error("Device %s already added", name)

if port_no == -1:
while True:
port_no = self._next_port
self._next_port += 1
if port_no not in self.ports: break

if port_no in self.ports:
on_error("Port %s already exists -- ignoring", port_no)
return

phy = of.ofp_phy_port()
phy.port_no = port_no
phy.hw_addr = dev['addrs']['ethernet']['addr']
phy.name = name
# Fill in features sort of arbitrarily
phy.curr = of.OFPPF_10MB_HD
phy.advertised = of.OFPPF_10MB_HD
phy.supported = of.OFPPF_10MB_HD
phy.peer = of.OFPPF_10MB_HD

self.add_port(phy)

px = pxpcap.PCap(name, callback = self._pcap_rx, start = False)
px.port_no = phy.port_no
self.px[phy.port_no] = px

if start:
px.start()

return px

def remove_interface (self, name_or_num):
if isinstance(name_or_num, basestring):
for no,p in self.px.iteritems():
if p.device == name_or_num:
self.remove_interface(no)
return
raise ValueError("No such interface")

px = self.px[name_or_num]
px.stop()
px.port_no = None
self.delete_port(name_or_num)

def _handle_GoingDownEvent (self, event):
self.q.put(None)

Expand Down Expand Up @@ -140,6 +257,7 @@ def rx_batch (self, batch):
self.rx_packet(data, port_no)

def _pcap_rx (self, px, data, sec, usec, length):
if px.port_no is None: return
self.q.put((px.port_no, data))

def _output_packet_physical (self, packet, port_no):
Expand All @@ -148,4 +266,6 @@ def _output_packet_physical (self, packet, port_no):
This is called by the more general _output_packet().
"""
self.px[port_no].inject(packet)
px = self.px.get(port_no)
if not px: return
px.inject(packet)
Loading

0 comments on commit 55a1301

Please sign in to comment.