DNS/実装/python/dnslib/sample-resolver/interceptについて、ここに記述してください。 これがひな型としていいかな。-- ToshinoriMaeno <> https://docs.python.org/3/library/argparse.html {{{#!python # -*- coding: utf-8 -*- """ InterceptResolver - proxy requests to upstream server (optionally intercepting) """ from __future__ import print_function import binascii,copy,socket,struct,sys from dnslib import DNSRecord,RR,QTYPE,RCODE,parse_time from dnslib.server import DNSServer,DNSHandler,BaseResolver,DNSLogger from dnslib.label import DNSLabel class InterceptResolver(BaseResolver): """ Intercepting resolver Proxy requests to upstream server optionally intercepting requests matching local records """ def __init__(self,address,port,ttl,intercept,skip,nxdomain,timeout=0): """ address/port - upstream server ttl - default ttl for intercept records intercept - list of wildcard RRs to respond to (zone format) skip - list of wildcard labels to skip nxdomain - list of wildcard labels to retudn NXDOMAIN timeout - timeout for upstream server """ self.address = address self.port = port self.ttl = parse_time(ttl) self.skip = skip self.nxdomain = nxdomain self.timeout = timeout self.zone = [] for i in intercept: if i == '-': i = sys.stdin.read() for rr in RR.fromZone(i,ttl=self.ttl): self.zone.append((rr.rname,QTYPE[rr.rtype],rr)) def resolve(self,request,handler): reply = request.reply() qname = request.q.qname qtype = QTYPE[request.q.qtype] # Try to resolve locally unless on skip list if not any([qname.matchGlob(s) for s in self.skip]): for name,rtype,rr in self.zone: if qname.matchGlob(name) and (qtype in (rtype,'ANY','CNAME')): a = copy.copy(rr) a.rname = qname reply.add_answer(a) # Check for NXDOMAIN if any([qname.matchGlob(s) for s in self.nxdomain]): reply.header.rcode = getattr(RCODE,'NXDOMAIN') return reply # Otherwise proxy if not reply.rr: try: if handler.protocol == 'udp': proxy_r = request.send(self.address,self.port, timeout=self.timeout) else: proxy_r = request.send(self.address,self.port, tcp=True,timeout=self.timeout) reply = DNSRecord.parse(proxy_r) except socket.timeout: reply.header.rcode = getattr(RCODE,'NXDOMAIN') return reply if __name__ == '__main__': import argparse,sys,time p = argparse.ArgumentParser(description="DNS Intercept Proxy") p.add_argument("--port","-p",type=int,default=53, metavar="", help="Local proxy port (default:53)") p.add_argument("--address","-a",default="", metavar="
", help="Local proxy listen address (default:all)") p.add_argument("--upstream","-u",default="8.8.8.8:53", metavar="", help="Upstream DNS server:port (default:8.8.8.8:53)") p.add_argument("--tcp",action='store_true',default=False, help="TCP proxy (default: UDP only)") p.add_argument("--intercept","-i",action="append", metavar="", help="Intercept requests matching zone record (glob) ('-' for stdin)") p.add_argument("--skip","-s",action="append", metavar="