summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/__init__.py0
-rw-r--r--lib/config.py16
-rw-r--r--lib/core.py139
-rw-r--r--lib/model.py181
4 files changed, 336 insertions, 0 deletions
diff --git a/lib/__init__.py b/lib/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/lib/__init__.py
diff --git a/lib/config.py b/lib/config.py
new file mode 100644
index 0000000..f4fab70
--- /dev/null
+++ b/lib/config.py
@@ -0,0 +1,16 @@
+import configparser
+
+class Config:
+ _instance = None
+
+ def __new__(cls, *args, **kwargs):
+ if not cls._instance:
+ cls._instance = super(Config, cls).__new__(
+ cls, *args, **kwargs)
+
+ return cls._instance
+
+ def __init__(self):
+ self.parser = configparser.ConfigParser()
+ self.parser.read('pywhoisd.conf') # Find a way not to hardcode this
+
diff --git a/lib/core.py b/lib/core.py
new file mode 100644
index 0000000..e7ac571
--- /dev/null
+++ b/lib/core.py
@@ -0,0 +1,139 @@
+import socketserver
+import ipaddr
+import re
+
+from lib.config import Config
+
+class Daemon():
+ def __init__(self, data):
+ self.data = data
+ self.domain_regexp = re.compile("(?!-)[A-Z\d-]{1,63}(?<!-)$", re.IGNORECASE)
+ self.footer = None
+ self.header = None
+
+ def query(self, q):
+ if self.is_ip(q):
+ result = self.search_ip(q)
+ elif self.is_domain(q):
+ result = self.search_domain(q)
+ else:
+ # Try to find a person with the given query
+ person = search_person(q)
+ if person:
+ result = person
+ else:
+ result = self.get_help()
+
+ return self.format_result(result)
+
+ def format_result(self, values):
+ """Receive a dictionary and return a string"""
+ result = ""
+ for k, v in values.items():
+ if k == 'domains':
+ result += '\nAssociated domains\n'
+ for i, dom in enumerate(v):
+ result += ' {0}\n'.format(dom.name)
+ elif k == 'admins':
+ result += '\nAssociated administrator/s\n '
+ for i, adm in enumerate(v):
+ result += ' {0} {1} <{2}>\n'.format(adm.name, adm.surname, adm.email)
+ else:
+ result += '{0}: {1}\n'.format(k, v)
+
+ return result
+
+ def search_ip(self, ip):
+ result = {}
+
+ # Iterate over all IP block elements
+ networks = self.data.get_networks()
+ for key in networks:
+ for block in networks[key].ip_blocks:
+ if ipaddr.IPAddress(ip) in ipaddr.IPNetwork(block):
+ return networks[key].as_dict(self.data)
+
+ result['error'] = "Network not found"
+ return result
+
+ def search_domain(self, domain):
+ # Iterate over all network and check its domains
+ networks = self.data.get_networks()
+ domains = self.data.get_domains()
+ for network in networks.values():
+ if any(domains[d].name == domain for d in network.domains):
+ return network.as_dict(self.data)
+
+ return {'error':'Domain not found'}
+
+ # TODO
+
+ def search_person(self, query):
+ pass
+
+ def is_ip(self, query):
+ try:
+ ipaddr.IPAddress(query)
+ return True
+ except ValueError:
+ return False
+
+ def is_domain(self, hostname):
+ if len(hostname) > 255:
+ return False
+
+ if hostname[-1:] == ".":
+ hostname = hostname[:-1] # strip exactly one dot from the right, if present
+
+ return all(self.domain_regexp.match(x) for x in hostname.split("."))
+
+ # TODO
+ def get_help(self):
+ return "This will be the help"
+
+ def get_footer(self):
+ if not self.footer:
+ f = open(Config().parser['Printing']['footer'])
+ self.footer = f.read()
+ f.close()
+
+ return self.footer
+
+ def get_header(self):
+ if not self.header:
+ f = open(Config().parser['Printing']['header'])
+ self.header = f.read()
+ f.close()
+
+ return self.header
+
+class WhoisHandler(socketserver.BaseRequestHandler):
+
+ def setup(self):
+ self.daemon = self.server.daemon
+
+ def handle(self):
+ data = str(self.request.recv(100).strip(), 'utf-8')
+ print('Received: {}'.format(data))
+
+ response = self.daemon.get_header()
+ response += self.daemon.query(data)
+ response += self.daemon.get_footer()
+
+ self.request.sendall(bytes(response, 'utf-8'))
+
+class ClassicServer(socketserver.ThreadingTCPServer):
+ def __init__(self, daemon):
+ host = Config().parser['Servers']['classic_host']
+ port = int(Config().parser['Servers']['classic_port'])
+ self.daemon = daemon
+
+ socketserver.ThreadingTCPServer.__init__(self, (host, port), WhoisHandler)
+
+class WebServer(socketserver.ThreadingTCPServer):
+ def __init__(self, daemon):
+ self.host = Config().parser['Servers']['web_host']
+ self.port = int(Config().parser['Servers']['web_port'])
+ self.daemon = daemon
+
+ socketserver.ThreadingTCPServer.__init__(self, (self.host, self.port), WhoisHandler)
diff --git a/lib/model.py b/lib/model.py
new file mode 100644
index 0000000..933d901
--- /dev/null
+++ b/lib/model.py
@@ -0,0 +1,181 @@
+from xml.etree.ElementTree import ElementTree
+
+from lib.config import Config
+
+class Network():
+ """A simple network definition"""
+
+ def __init__(self):
+ self.name = ""
+ self.domains = []
+ self.admins = []
+ self.ip_blocks = []
+ self.data = {}
+
+ def as_dict(self, data):
+ # Beautify
+ result = {}
+ domains = data.get_domains()
+ persons = data.get_persons()
+
+ result['name'] = self.name
+
+ # Domains
+ result['domains'] = []
+ for d in self.domains:
+ result['domains'].append(domains[d])
+
+ # Admins
+ result['admins'] = []
+ for a in self.admins:
+ result['admins'].append(persons[a])
+
+ # Networks
+ result['networks'] = self.ip_blocks
+
+ return result
+
+class Domain():
+ """A simple domain definition"""
+
+ def __init__(self):
+ self.name = ""
+ self.admins = []
+ self.data = {}
+
+ def add_admin(self, admin):
+ """Add an administrator for this network"""
+
+ self.admins.append(admin)
+
+class Person():
+ """A simple person definition"""
+
+ def __init__(self):
+ self.name = ""
+ self.surname = ""
+ self.email = ""
+ self.data = {}
+
+class Data():
+ """Abstract class for storing and getting information"""
+
+ def __init__(self):
+ self.config = Config().parser
+ self.networks = None
+ self.domains = None
+ self.persons = None
+
+ def parse_config(self):
+ """Parse neccesary config params depending on the method used
+
+ Abstract method"""
+
+ pass
+
+ def load_data(self):
+ """Load data from defined source.
+
+ Abstract method"""
+
+ pass
+
+ def get_networks(self):
+ """Return all networks. Common method for all kind of storages."""
+
+ if self.networks == None:
+ self.load_data()
+
+ return self.networks
+
+ def get_domains(self):
+ """Return all domains. Common method for all kind of storages."""
+
+ if self.networks == None:
+ self.load_data()
+
+ return self.domains
+
+ def get_persons(self):
+ """Return all persons. Common method for all kind of storages."""
+
+ if self.persons == None:
+ self.load_data()
+
+ return self.persons
+
+class DataXML(Data):
+ """Reads network information from a XML file"""
+
+ def parse_config(self):
+ """Reads and sets up XML config file fields"""
+
+ self.data_file = self.config['Storage']['xml_file']
+
+ def load_data(self):
+ """Parse XML for getting network information"""
+
+ self.parse_config()
+ root = ElementTree(file=self.data_file).getroot()
+
+ self.networks = {}
+ self.domains = {}
+ self.persons = {}
+
+ for elem in root:
+ if elem.tag == 'person':
+ self.add_person(elem)
+ elif elem.tag == 'domain':
+ self.add_domain(elem)
+ elif elem.tag == 'network':
+ self.add_network(elem)
+ else:
+ pass # raise TagNotFoundError
+
+ def add_person(self, elem):
+ """Adds a new person"""
+
+ person = Person()
+ for e in elem:
+ if e.tag == 'name':
+ person.name = e.text
+ elif e.tag == 'surname':
+ person.surname = e.text
+ elif e.tag == 'email':
+ person.email = e.text
+ else:
+ person.data[e.tag] = e.text
+
+ print("[+] Read person: {0} - {1} - {2}".format(person.name, person.surname, person.email))
+ self.persons[elem.attrib['id']] = person
+
+ def add_domain(self, elem):
+ """Adds a new domain"""
+
+ domain = Domain()
+ for e in elem:
+ if e.tag == 'name':
+ domain.name = e.text
+ else:
+ domain.data[e.tag] = e.text
+
+ print("[+] Read domain: {}".format(domain.name))
+ self.domains[elem.attrib['id']] = domain
+
+ def add_network(self, elem):
+ """Adds a new network"""
+
+ network = Network()
+ for e in elem:
+ if e.tag == 'name':
+ network.name = e.text
+ elif e.tag == 'domain':
+ network.domains.append(e.text)
+ elif e.tag == 'ip_block':
+ network.ip_blocks.append(e.text)
+ elif e.tag == 'admin':
+ network.admins.append(e.text)
+ else:
+ network.data[e.tag] = e.text
+
+ self.networks[elem.attrib['id']] = network
nihil fit ex nihilo