#
# Copyright (C) 2007, 2008 Red Hat, Inc.
# Authors:
# Thomas Woerner
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
#
from copy import copy
from optparse import Option, OptionError, OptionParser, Values, \
SUPPRESS_HELP, BadOptionError, OptionGroup
import fw_config
from fw_functions import getPortID, getPortRange, getServiceName, checkIP, \
checkInterface
from fw_services import getByKey as getServiceByKey
from fw_icmp import getByKey as getICMPTypeByKey
import os.path
import sys
def _check_port(option, opt, value):
failure = False
try:
(ports, protocol) = value.split(":")
except:
failure = True
else:
range = getPortRange(ports.strip())
if range == -1:
failure = True
elif range == None:
raise OptionError(_("port range %s is not unique.") % value, opt)
elif len(range) == 2 and range[0] >= range[1]:
raise OptionError(_("%s is not a valid range (start port >= end "
"port).") % value, opt)
if not failure:
protocol = protocol.strip()
if protocol not in [ "tcp", "udp" ]:
raise OptionError(_("%s is not a valid protocol.") % protocol, opt)
if failure:
raise OptionError(_("invalid port definition %s.") % value, opt)
return (range, protocol)
def _check_rulesfile(option, opt, value):
type = "ipv4"
table = "filter"
splits = value.split(":", 1)
if len(splits) > 1 and splits[0] in fw_config.FIREWALL_TYPES:
type = splits[0]
splits = splits[1].split(":", 1)
if len(splits) > 1 and splits[0] in fw_config.FIREWALL_TABLES:
table = splits[0]
splits = splits[1].split(":", 1)
filename = ":".join(splits)
if type == "ipv6" and table == "nat":
raise OptionError(_("ipv6 has no nat support."), opt)
return (type, table, filename)
def _check_service(option, opt, value):
if not getServiceByKey(value):
raise OptionError(_("invalid service '%s'.") % value, opt)
return value
def _check_icmp_type(option, opt, value):
if not getICMPTypeByKey(value):
dict = { "option": opt, "value": value }
raise OptionError(_("option %(option)s: invalid icmp type "
"'%(value)s'.") % dict, opt)
return value
def _check_forward_port(option, opt, value):
result = { }
error = None
splits = value.split(":", 1)
while len(splits) > 0:
key_val = splits[0].split("=")
if len(key_val) != 2:
error = _("Invalid argument %s") % splits[0]
break
(key, val) = key_val
if (key == "if" and checkInterface(val)) or \
(key == "proto" and val in [ "tcp", "udp" ]) or \
(key == "toaddr" and checkIP(val)):
result[key] = val
elif (key == "port" or key == "toport") and getPortRange(val) > 0:
result[key] = getPortRange(val)
else:
error = _("Invalid argument %s") % splits[0]
break
if len(splits) > 1:
if splits[1].count("=") == 1:
# last element
splits = [ splits[1] ]
else:
splits = splits[1].split(":", 1)
else:
# finish
splits.pop()
if error:
dict = { "option": opt, "value": value, "error": error }
raise OptionError(_("option %(option)s: invalid forward_port "
"'%(value)s': %(error)s.") % dict, opt)
error = False
for key in [ "if", "port", "proto" ]:
if key not in result.keys():
error = True
if not "toport" in result.keys() and not "toaddr" in result.keys():
error = True
if error:
dict = { "option": opt, "value": value }
raise OptionError(_("option %(option)s: invalid forward_port "
"'%(value)s'.") % dict, opt)
return result
def _check_interface(option, opt, value):
if not checkInterface(value):
raise OptionError(_("invalid interface '%s'.") % value, opt)
return value
def _append_unique(option, opt, value, parser, *args, **kwargs):
vals = getattr(parser.values, option.dest)
if vals and value in vals:
return
parser.values.ensure_value(option.dest, []).append(value)
class _Option(Option):
TYPES = Option.TYPES + ("port", "rulesfile", "service", "forward_port",
"icmp_type", "interface")
TYPE_CHECKER = copy(Option.TYPE_CHECKER)
TYPE_CHECKER["port"] = _check_port
TYPE_CHECKER["rulesfile"] = _check_rulesfile
TYPE_CHECKER["service"] = _check_service
TYPE_CHECKER["forward_port"] = _check_forward_port
TYPE_CHECKER["icmp_type"] = _check_icmp_type
TYPE_CHECKER["interface"] = _check_interface
def _addStandardOptions(parser):
parser.add_option("--enabled",
action="store_true", dest="enabled", default=True,
help=_("Enable firewall (default)"))
parser.add_option("--disabled",
action="store_false", dest="enabled",
help=_("Disable firewall"))
parser.add_option("--addmodule",
action="callback", dest="add_module", type="string",
metavar=_(""), callback=_append_unique,
help=_("Enable an iptables module"))
parser.add_option("--removemodule",
action="callback", dest="remove_module", type="string",
metavar=_(""), callback=_append_unique,
help=_("Disable an iptables module"))
parser.add_option("-s", "--service",
action="callback", dest="services", type="service",
default=[ ],
metavar=_(""), callback=_append_unique,
help=_("Open the firewall for a service (e.g, ssh)"))
parser.add_option("-p", "--port",
action="callback", dest="ports", type="port",
metavar=_("[-]:"),
callback=_append_unique,
help=_("Open specific ports in the firewall "
"(e.g, ssh:tcp)"))
parser.add_option("-t", "--trust",
action="callback", dest="trust", type="interface",
metavar=_(""), callback=_append_unique,
help=_("Allow all traffic on the specified device"))
parser.add_option("-m", "--masq",
action="callback", dest="masq", type="interface",
metavar=_(""), callback=_append_unique,
help=_("Masquerades traffic from the specified device. "
"This is IPv4 only."))
parser.add_option( "--high", "--medium",
action="store_true", dest="enabled",
help=_("Backwards compatibility, aliased to --enabled"))
parser.add_option("--custom-rules",
action="callback", dest="custom_rules", type="rulesfile",
metavar=_("[:][:]"),
callback=_append_unique,
help=_("Specify a custom rules file for inclusion in "
"the firewall, after the "
"default rules. Default protocol type: ipv4, "
"default table: filter. "
"(Example: ipv4:filter:/etc/sysconfig/"
"ipv4_filter_addon)"))
parser.add_option("--forward-port",
action="callback", dest="forward_port",
type="forward_port",
metavar=_("if=:port=:proto="
"[:toport=]"
"[:toaddr=]"),
callback=_append_unique,
help=_("Forward the port with protocol for the "
"interface to either another local destination "
"port (no destination address given) or to an "
"other destination address with an optional "
"destination port. This is IPv4 only."))
parser.add_option("--block-icmp",
action="callback", dest="block_icmp", type="icmp_type",
default=[ ],
callback=_append_unique,
metavar=_(""),
help=_("Block this ICMP type. The default is to accept "
"all ICMP types."))
def _addCompatOptions(parser):
parser.add_option("--no-ipsec",
action="store_true", dest="no_ipsec",
help=_("Disable Internet Protocol Security (IPsec)"))
parser.add_option("--no-ipp",
action="store_true", dest="no_ipp",
help=_("Disable Internet Printing Protocol (IPP)"))
parser.add_option("--no-mdns",
action="store_true", dest="no_mdns",
help=_("Disable Multicast DNS (mDNS)"))
def _addSELinuxOptions(parser):
group = OptionGroup(parser, _("SELinux Options (deprecated)"),
_("Using these options with no additional firewall "
"options will not create or alter firewall "
"configuration, only SELinux will be configured."))
group.add_option("--selinux",
action="store", dest="selinux", type="choice",
metavar=_(""), choices=fw_config.SELINUX_MODES,
help=_("Configure SELinux mode: %s") % \
", ".join(fw_config.SELINUX_MODES))
group.add_option("--selinuxtype",
action="store", dest="selinuxtype", type="string",
metavar=_(""),
help=_("Configure SELinux type: Usually targeted or "
"strict Policy"))
parser.add_option_group(group)
def _parse_args(parser, args, options=None):
try:
(_options, _args) = parser.parse_args(args, options)
except Exception, error:
parser.error(error)
return None
if len(_args) != 0:
for arg in _args:
parser.error(_("no such option: %s") % arg)
if parser._fw_exit:
if fw_config.ui:
fw_config.ui.parse_exit(2)
else:
sys.exit(2)
if not hasattr(_options, "filename"):
_options.filename = None
if not hasattr(_options, "converted"):
_options.converted = False
return _options
class _OptionParser(OptionParser):
# overload print_help: rhpl._ returns UTF-8
def print_help(self, file=None):
if file is None:
file = sys.stdout
str = self.format_help()
if isinstance(str, unicode):
encoding = self._get_encoding(file)
str = str.encode(encoding, "replace")
file.write(str)
def print_usage(self, file=None):
pass
def exit(self, status=0, msg=None):
if msg:
if fw_config.ui:
fw_config.ui.parse_error(msg)
else:
print >>sys.stderr, msg
if not fw_config.ui:
self._fw_exit = True
def error(self, msg):
if self._fw_source:
text = "%s: %s" % (self._fw_source, msg)
else:
text = str(msg)
self.exit(2, msg=text)
def _match_long_opt(self, opt):
if self._long_opt.has_key(opt):
return opt
raise BadOptionError(opt)
def _process_long_opt(self, rargs, values):
# allow to ignore errors in the ui
try:
# OptionParser._process_long_opt(self, rargs, values)
self.__process_long_opt(rargs, values)
except Exception, msg:
self.error(msg)
def _process_short_opts(self, rargs, values):
# allow to ignore errors in the ui
try:
OptionParser._process_short_opts(self, rargs, values)
except Exception, msg:
self.error(msg)
def __process_long_opt(self, rargs, values):
arg = rargs.pop(0)
# Value explicitly attached to arg? Pretend it's the next
# argument.
if "=" in arg:
(opt, next_arg) = arg.split("=", 1)
had_explicit_value = True
else:
opt = arg
had_explicit_value = False
opt = self._match_long_opt(opt)
option = self._long_opt[opt]
if option.takes_value():
nargs = option.nargs
if len(rargs)+int(had_explicit_value) < nargs:
if nargs == 1:
self.error(_("%s option requires an argument") % opt)
else:
dict = { "option": opt, "count": nargs }
self.error(_("%(option)s option requires %(count)s "
"arguments") % dict)
elif nargs == 1 and had_explicit_value:
value = next_arg
elif nargs == 1:
value = rargs.pop(0)
elif had_explicit_value:
value = tuple([ next_arg ] + rargs[0:nargs-1])
del rargs[0:nargs-1]
else:
value = tuple(rargs[0:nargs])
del rargs[0:nargs]
elif had_explicit_value:
self.error(_("%s option does not take a value") % opt)
else:
value = None
option.process(opt, value, values, self)
def _gen_parser(source=None):
parser = _OptionParser(add_help_option=False, option_class=_Option)
parser._fw_source = source
parser._fw_exit = False
return parser
def parseSysconfigArgs(args, options=None, compat=False, source=None):
parser = _gen_parser(source)
_addStandardOptions(parser)
if compat:
_addCompatOptions(parser)
return _parse_args(parser, args, options)
def parseSELinuxArgs(args, options=None, source=None):
parser = _gen_parser(source)
_addSELinuxOptions(parser)
return _parse_args(parser, args, options)
def parseLokkitArgs(args=None, options=None, compat=False):
parser = _gen_parser()
parser.add_option("-?", "-h", "--help", "--usage", action="help",
help=_("Show this help message"))
parser.add_option("-q", "--quiet",
action="store_true", dest="quiet",
help=_("Run noninteractively; process only command-line "
"arguments"))
parser.add_option("-v", "--verbose",
action="store_true", dest="verbose",
help=_("Be more verbose"))
parser.add_option("--version",
action="store_true", dest="version",
help=_("Show version"))
parser.add_option("-n", "--nostart",
action="store_true", dest="nostart",
help=_("Configure firewall but do not activate the new "
"configuration"))
parser.add_option("-f",
action="store_true", dest="force",
help=_("Ignore actual settings"))
parser.add_option("--update",
action="store_true", dest="update",
help=_("Update firewall non-interactively if the "
"firewall is enabled. This will also restart the "
"firewall. The -n and -f options will be "
"ignored."))
parser.add_option("--default",
action="store", dest="default", type="choice",
metavar=_(""), choices=fw_config.DEFAULT_TYPES,
help=_("Set firewall default type: %s. "
"This overwrites any existing "
"configuration.") % ", ".join(fw_config.DEFAULT_TYPES))
parser.add_option("--list-services",
action="store_true", dest="list_services",
help=_("List predefined services."))
parser.add_option("--list-icmp-types",
action="store_true", dest="list_icmp_types",
help=_("List the supported icmp types."))
_addSELinuxOptions(parser)
_addStandardOptions(parser)
if len(sys.argv) < 2:
parser.print_help()
sys.exit(0)
_options = _parse_args(parser, args, options)
_options.nofw = False
if args == None and _options:
selinux = False
firewall = False
for arg in sys.argv[1:]:
if arg.startswith("--selinux"):
selinux = True
else:
firewall = True
if selinux and not firewall:
_options.nofw = True
return _options
def parseDBUSArgs(args=None, options=None, compat=False):
parser = _gen_parser()
parser.add_option("-v", "--verbose",
action="store_true", dest="verbose",
help=_("Be more verbose"))
parser.add_option("-n", "--nostart",
action="store_true", dest="nostart",
help=_("Configure firewall but do not activate the new "
"configuration"))
parser.add_option("-f",
action="store_true", dest="force",
help=_("Ignore actual settings"))
parser.add_option("--update",
action="store_true", dest="update",
help=_("Update firewall non-interactively if the "
"firewall is enabled. This will also restart the "
"firewall. The -n and -f options will be "
"ignored."))
parser.add_option("--default",
action="store", dest="default", type="choice",
metavar=_(""), choices=fw_config.DEFAULT_TYPES,
help=_("Set firewall default type: %s. "
"This overwrites any existing "
"configuration.") % ", ".join(fw_config.DEFAULT_TYPES))
_addSELinuxOptions(parser)
_addStandardOptions(parser)
return _parse_args(parser, args, options)
def copyValues(values):
if not values:
return None
new_values = Values()
new_values.__dict__ = copy(values.__dict__)
return new_values