Package netaddr :: Package ip
[hide private]
[frames] | no frames]

Source Code for Package netaddr.ip

  1  #!/usr/bin/env python
 
  2  #-----------------------------------------------------------------------------
 
  3  #   Copyright (c) 2008-2009, David P. D. Moss. All rights reserved.
 
  4  #
 
  5  #   Released under the BSD license. See the LICENSE file for details.
 
  6  #-----------------------------------------------------------------------------
 
  7  """
 
  8  Provides access to public network address information published by IANA.
 
  9  
 
 10  More details can be found at the following URLs :-
 
 11  
 
 12  Internet Assigned Numbers Authority (IANA)
 
 13   
 14      - http://www.iana.org/
 
 15      - http://www.iana.org/protocols/ 
 16  """ 
 17   
 18  import os as _os 
 19  import os.path as _path 
 20  import re as _re 
 21  
 
 22  from netaddr.core import Publisher, Subscriber, PrettyPrinter 
 23  from netaddr import AT_INET, AT_INET6 
 24  from netaddr.address import CIDR, IPRange 
 25   
 26  #----------------------------------------------------------------------------- 
 27   
 28  #: Topic based lookup dictionary for IANA information. 
 29  IANA_INFO = { 
 30      'IPv4'      : {}, 
 31      'IPv6'      : {}, 
 32      'multicast' : {}, 
 33  } 
 34  
 
 35  #-----------------------------------------------------------------------------
 
36 -class LineRecordParser(Publisher):
37 """ 38 A configurable Parser that understands how to parse line based records. 39 """
40 - def __init__(self, fh, **kwargs):
41 """ 42 Constructor. 43 44 fh - a valid, open file handle to line based record data. 45 """ 46 super(LineRecordParser, self).__init__() 47 self.fh = fh 48 49 self.__dict__.update(kwargs) 50 51 # Regex used to identify start of lines of interest. 52 if 're_start' not in self.__dict__: 53 self.re_start = r'^.*$' 54 55 # Regex used to identify line to be parsed within block of interest. 56 if 're_parse_line' not in self.__dict__: 57 self.re_parse_line = r'^.*$' 58 59 # Regex used to identify end of lines of interest. 60 if 're_stop' not in self.__dict__: 61 self.re_stop = r'^.*$' 62 63 # If enabled, skips blank lines after being stripped. 64 if 'skip_blank_lines' not in self.__dict__: 65 self.skip_blank_lines = False
66
67 - def parse_line(self, line):
68 """ 69 This is the callback method invoked for every line considered valid by 70 the line parser's settings. It is usually over-ridden by base classes 71 to provide specific line parsing and line skipping logic. 72 73 Any line can be vetoed (not passed to registered Subscriber objects) 74 by simply returning None. 75 """ 76 return line
77
78 - def parse(self):
79 """ 80 Parse and normalises records, notifying registered subscribers with 81 record data as it is encountered. 82 """ 83 record = None 84 section_start = False 85 section_end = False 86 87 for line in self.fh: 88 line = line.strip() 89 90 # Skip blank lines if required. 91 if self.skip_blank_lines and line == '': 92 continue 93 94 # Entered record section. 95 if not section_start and len(_re.findall(self.re_start, line)) > 0: 96 section_start = True 97 98 # Exited record section. 99 if section_start and len(_re.findall(self.re_stop, line)) > 0: 100 section_end = True 101 102 # Stop parsing. 103 if section_end: 104 break 105 106 # Is this a line of interest? 107 if section_start and len(_re.findall(self.re_parse_line, line)) == 0: 108 continue 109 110 if section_start: 111 record = self.parse_line(line) 112 113 # notify subscribers of final record details. 114 self.notify(record)
115 116 #-----------------------------------------------------------------------------
117 -class IPv4Parser(LineRecordParser):
118 """ 119 A LineRecordParser that understands how to parse and retrieve data records 120 from the IANA IPv4 address space file. 121 122 It can be found online here :- 123 124 http://www.iana.org/assignments/ipv4-address-space 125 """
126 - def __init__(self, fh, **kwargs):
127 """ 128 Constructor. 129 130 fh - a valid, open file handle to an OUI Registry data file. 131 132 kwargs - additional parser options. 133 134 """ 135 super(IPv4Parser, self).__init__(fh, 136 re_start=r'^Prefix', 137 re_parse_line=r'^\d{3}\/\d', 138 re_stop=r'^Notes\s*$', 139 skip_blank_lines=True, 140 ) 141 142 self.record_widths = ( 143 ('prefix', 0, 8), 144 ('designation', 8, 49), 145 ('date', 57, 10), 146 ('whois', 67, 20), 147 ('status', 87, 19), 148 )
149
150 - def parse_line(self, line):
151 """ 152 Callback method invoked for every line considered valid by the line 153 parser's settings. 154 155 See base class method for more details. 156 """ 157 record = {} 158 for (key, start, width) in self.record_widths: 159 value = line[start:start+width] 160 record[key] = value.strip() 161 162 # Strip leading zeros from octet. 163 if '/' in record['prefix']: 164 (octet, prefix) = record['prefix'].split('/') 165 record['prefix'] = "%d/%d" % (int(octet), int(prefix)) 166 167 record['status'] = record['status'].capitalize() 168 169 return record
170 171 #-----------------------------------------------------------------------------
172 -class IPv6Parser(LineRecordParser):
173 """ 174 A LineRecordParser that understands how to parse and retrieve data records 175 from the IANA IPv6 address space file. 176 177 It can be found online here :- 178 179 http://www.iana.org/assignments/ipv6-address-space 180 """
181 - def __init__(self, fh, **kwargs):
182 """ 183 Constructor. 184 185 fh - a valid, open file handle to an OUI Registry data file. 186 187 kwargs - additional parser options. 188 189 """ 190 super(IPv6Parser, self).__init__(fh, 191 re_start=r'^IPv6 Prefix', 192 re_parse_line=r'^[A-F0-9]+::\/\d+', 193 re_stop=r'^Notes:\s*$', 194 skip_blank_lines=True) 195 196 self.record_widths = ( 197 ('prefix', 0, 22), 198 ('allocation', 22, 24), 199 ('reference', 46, 15))
200
201 - def parse_line(self, line):
202 """ 203 Callback method invoked for every line considered valid by the line 204 parser's settings. 205 206 See base class method for more details. 207 """ 208 record = {} 209 for (key, start, width) in self.record_widths: 210 value = line[start:start+width] 211 record[key] = value.strip() 212 213 # Remove square brackets from reference field. 214 record[key] = record[key].lstrip('[') 215 record[key] = record[key].rstrip(']') 216 return record
217 218 #-----------------------------------------------------------------------------
219 -class MulticastParser(LineRecordParser):
220 """ 221 A LineParser that knows how to process the IANA IPv4 multicast address 222 allocation file. 223 224 It can be found online here :- 225 226 http://www.iana.org/assignments/multicast-addresses 227 """
228 - def __init__(self, fh, **kwargs):
229 """ 230 Constructor. 231 232 fh - a valid, open file handle to an OUI Registry data file. 233 234 kwargs - additional parser options. 235 236 """ 237 super(MulticastParser, self).__init__(fh, 238 re_start=r'^Registry:', 239 re_parse_line=r'^\d+\.\d+\.\d+\.\d+', 240 re_stop=r'^Relative', 241 skip_blank_lines=True)
242
243 - def normalise_addr(self, addr):
244 """ 245 Removes variations from address entries found in this particular file. 246 """ 247 if '-' in addr: 248 (a1, a2) = addr.split('-') 249 o1 = a1.strip().split('.') 250 o2 = a2.strip().split('.') 251 return "%s-%s" % ('.'.join([str(int(i)) for i in o1]), 252 '.'.join([str(int(i)) for i in o2])) 253 else: 254 o1 = addr.strip().split('.') 255 return '.'.join([str(int(i)) for i in o1])
256
257 - def parse_line(self, line):
258 """ 259 Callback method invoked for every line considered valid by the line 260 parser's settings. 261 262 See base class method for more details. 263 """ 264 index = line.find('[') 265 if index != -1: 266 line = line[0:index].strip() 267 (addr, descr) = [i.strip() for i in _re.findall( 268 r'^([\d.]+(?:\s*-\s*[\d.]+)?)\s+(.+)$', line)[0]] 269 addr = self.normalise_addr(addr) 270 descr = ' '.join(descr.split()) 271 descr = descr.replace('Date registered' , '').rstrip() 272 273 return dict(address=addr, descr=descr)
274 275 #-----------------------------------------------------------------------------
276 -class DictUpdater(Subscriber):
277 """ 278 Concrete Subscriber that inserts records received from a Publisher into a 279 dictionary. 280 """
281 - def __init__(self, dct, topic, unique_key):
282 """ 283 Constructor. 284 285 dct - lookup dict or dict like object to insert records into. 286 287 topic - high-level category name of data to be processed. 288 289 unique_key - key name in data dict that uniquely identifies it. 290 """ 291 self.dct = dct 292 self.topic = topic 293 self.unique_key = unique_key
294
295 - def update(self, data):
296 """ 297 Callback function used by Publisher to notify this Subscriber about 298 an update. Stores topic based information into dictionary passed to 299 constructor. 300 """ 301 data_id = data[self.unique_key] 302 303 if self.topic == 'IPv4': 304 cidr = CIDR(data_id) 305 self.dct[cidr] = data 306 elif self.topic == 'IPv6': 307 cidr = CIDR(data_id) 308 self.dct[cidr] = data 309 elif self.topic == 'multicast': 310 iprange = None 311 if '-' in data_id: 312 (first, last) = data_id.split('-') 313 iprange = IPRange(first, last) 314 else: 315 iprange = IPRange(data_id, data_id) 316 self.dct[iprange] = data
317 318 #-----------------------------------------------------------------------------
319 -def load_iana_info():
320 """ 321 Parse and load internal IANA data lookups with the latest information from 322 data files. 323 """ 324 PATH = _path.dirname(__file__) 325 326 ipv4 = IPv4Parser(open(_path.join(PATH, 'ipv4-address-space'))) 327 ipv4.attach(DictUpdater(IANA_INFO['IPv4'], 'IPv4', 'prefix')) 328 ipv4.parse() 329 330 ipv6 = IPv6Parser(open(_path.join(PATH, 'ipv6-address-space'))) 331 ipv6.attach(DictUpdater(IANA_INFO['IPv6'], 'IPv6', 'prefix')) 332 ipv6.parse() 333 334 mcast = MulticastParser(open(_path.join(PATH, 'multicast-addresses'))) 335 mcast.attach(DictUpdater(IANA_INFO['multicast'], 'multicast', 'address')) 336 mcast.parse()
337 338 #-----------------------------------------------------------------------------
339 -def query(ip_addr):
340 """ 341 Returns informational data specific to this IP address. 342 """ 343 info = {} 344 345 if ip_addr.addr_type == AT_INET: 346 for cidr, record in IANA_INFO['IPv4'].items(): 347 if ip_addr in cidr: 348 info.setdefault('IPv4', []) 349 info['IPv4'].append(record) 350 351 if ip_addr.is_multicast(): 352 for iprange, record in IANA_INFO['multicast'].items(): 353 if ip_addr in iprange: 354 info.setdefault('Multicast', []) 355 info['Multicast'].append(record) 356 357 elif ip_addr.addr_type == AT_INET6: 358 for cidr, record in IANA_INFO['IPv6'].items(): 359 if ip_addr in cidr: 360 info.setdefault('IPv6', []) 361 info['IPv6'].append(record) 362 363 return info
364 365 #-----------------------------------------------------------------------------
366 -def get_latest_files():
367 """Download the latest files from IANA""" 368 import urllib2 369 370 urls = [ 371 'http://www.iana.org/assignments/ipv4-address-space', 372 'http://www.iana.org/assignments/ipv6-address-space', 373 'http://www.iana.org/assignments/multicast-addresses', 374 ] 375 376 for url in urls: 377 print 'downloading latest copy of %s' % url 378 request = urllib2.Request(url) 379 response = urllib2.urlopen(request) 380 save_path = _path.dirname(__file__) 381 basename = _os.path.basename(response.geturl().rstrip('/')) 382 filename = _path.join(save_path, basename) 383 fh = open(filename, 'wb') 384 fh.write(response.read()) 385 fh.close()
386 387 388 #----------------------------------------------------------------------------- 389 if __name__ == '__main__': 390 # Generate indices when module is executed as a script. 391 get_latest_files() 392 393 # On module import, read IANA data files and populate lookups dict. 394 load_iana_info() 395