*本文原創作者:VillanCh,本文屬FreeBuf原創獎勵計畫,未經許可禁止轉載
回顧
我們在上一篇文章中討論TDD開發滲透工具,並且完成了我們HTTP代理掃描器的一個功能模組。那麼我們接下來就繼續使用TDD完成接下來的部分,大家也深入理解一下這種開發管道的好處(可是程式碼量大啊~hhhh)。
如何繼續上次的任務?
先運行測試用例!如果上次的任務沒有完成,你就需要繼續調整你的程式碼,修改,通過測試用例,這就是你首先要做的,當然如果上次的任務你完成了,測試用例順利通過,那麼當然好啊,接下來你就可以直接再寫一個測試用例,開始一個新的功能或者模塊了。
繼續
在之前的文章中我們完成了針對一個地址的代理驗證,那麼,我們接下來應該怎麼做呢?如果我們有一堆地址需要檢測,這樣我們就要處理兩大問題:
安全的多執行緒(多進程)併發
正確收取結果
短小精幹的併發處理
這樣的話,涉及到多執行緒程式設計,保證執行緒安全什麼BlaBla,對於一個滲透測試愛好者來說,可能會稍微有一點挑戰:(我平時寫的爆破甚至都是單執行緒的!),寫多執行緒還是有點麻煩的!當然我這裡也有考慮到這一點,在寫過很多類似的腳本之後,最近也算是首創了一種短小精悍的併發框架,具體的開發流程自然也是TDD,這裡我就不會和上次一樣很細緻的一步一步來講到底是怎麼寫的。我們就一起來看一下吧!
def test_pool(self):
'''测试并发'''
#定义一个任务函数,参数 *args 不定长
def demo_task(*args):
'''simulate the plugin.run'''
print '[!] Computing!'
time.sleep(args[0])
print 'Sleep : ', args[0]
print '[!] Finished!'
print
returns = 'Runtime Length : %s' % str(args)
return returns
#基本使用方法
#1. 建立一个并发池对象,
#2. 添加任务
#3. 启动
#4. 通过一个队列获取结果
pool = Pool()
pool.add_task(demo_task, 7)
pool.add_task(demo_task, 3)
q = pool.run()
self.assertIsInstance(q, Queue)
r = q.get()
print r
self.assertIsInstance(r, str)
pool.add_task(demo_task, 4)
pool.add_task(demo_task, 2)
pool.add_task(demo_task, 1)
pool.add_task(demo_task, 6)
pool.add_task(demo_task, 7)
pool.add_task(demo_task, 1)
pool.add_task(demo_task, 6)
pool.run()
r = q.get()
print '~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~' ,r
大家會發現,在測試的過程中,已經寫明了這個併發池的基本使用方法,當然這個使用方法不是根據Pool原始程式碼寫出的,而是我自己“瞎”寫的:也就是說,我並不知道原始程式碼,但是我設想我的小框架應該這麼寫,所以我就這麼寫了,至於怎麼實現,不是我寫測試用例需要考慮的問題。所以,現在覺得,這個框架是非常的用戶友好啊:至少對我自己來說用起來是非常的舒服。換句話來說,我在寫上面的程式碼的時候,其實並不知道自己的控制併發的小玩意叫什麼內部內部什麼構造,需要什麼模塊,需要什麼函數,或者有沒有什麼特殊内容之類的,就只是“我覺得他應該是這樣的”而已。至於寫下了測試用例以後發生了什麼我覺得就不用再多說了,大家都可以猜得到就是不停測試不停修改程式碼直到成功完成需要的功能。(當然這個測試程式的測試程式碼就不會面面俱到,滿足我們需求的程式碼就OK)
期待的運行結果如下:
[!] Computing!
[!] Computing!
Sleep : 3
[!] Finished!
Runtime Length : (3,)
[!] Computing!
[!] Computing!
[!] Computing!
[!] Computing!
[!] Computing!
[!] Computing!
[!] Computing!
Sleep : 1
Sleep : [!] Finished!
1
[!] Finished!
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Runtime Length : (1,)
.
----------------------------------------------------------------------
Ran 1 test in 4.033s
OK
Sleep : 2
[!] Finished!
Sleep : 7
[!] Finished!
Sleep : 4
[!] Finished!
Sleep : 6
[!] Finished!
Sleep : 6
[!] Finished!
Sleep : 7
[!] Finished!
獲取3s和1s的兩個結果,其他的分開操作不去獲取結果,測試在4.033s結束,其餘的行程後臺運行,但是不會被獲取結果:成功達到了初衷。
源碼在下麵:
class Pool(object):
"""Thread or Proccess Pool to support the
concurrence of many tasks"""
#----------------------------------------------------------------------
def __init__(self, thread_max=50, mode='Thread'):
"""Constructor"""
modes = ['thread', 'process']
self.mode = mode.lower() if mode in modes else 'thread'
self.task_list = []
self.result_queue = Queue()
self.signal_name = self._uuid1_str()
self.lock = threading.Lock() if self.mode == 'thread' else multiprocessing.Lock()
self.thread_max = thread_max
self.current_thread_count = 0
def _uuid1_str(self):
'''Returns: random UUID tag '''
return str(uuid.uuid1())
def add_task(self, func, *args, **argv):
'''Add task to Pool and wait to exec
Params:
func : A callable obj, the entity of the current task
args : the args of [func]
argv : the argv of [func]
'''
assert callable(func), '[!] Function can \'t be called'
ret = {}
ret['func'] = func
ret['args'] = args
ret['argv'] = argv
ret['uuid'] = self.signal_name
self.task_list.append(ret)
def run(self):
""""""
self._init_signal()
Thread(target=self._run).start()
return self.result_queue
#----------------------------------------------------------------------
def _run(self):
""""""
for i in self.task_list:
#print self.current_thread_count
while self.thread_max <= self.current_thread_count:
time.sleep(0.3)
self._start_task(i)
def _start_task(self, task):
""""""
self.current_thread_count = self.current_thread_count + 1
try:
if self.mode == 'thread':
#print 'Start'
Thread(target=self._worker, args=(task,)).start()
elif self.mode == 'process':
Process(target=self._worker, args=(task,)).start()
except TypeError:
self.current_thread_count = self.current_thread_count - 1
def _worker(self, dictobj):
""""""
func = dictobj['func']
args = dictobj['args']
argv = dictobj['argv']
result = func(*args, **argv)
self.lock.acquire()
self._add_result_to_queue(result=result)
self.lock.release()
def _add_result_to_queue(self, **kw):
""""""
assert kw.has_key('result'), '[!] Result Error!'
self.result_queue.put(kw['result'])
self.current_thread_count = self.current_thread_count - 1
嗯,總之,這一百行不到程式碼可以實現:
控制線程數量監控:current_threads
簡單易用的一次執行多個任務介面
非同步獲取結果
隨時添加任務
我們想要的是批量執行掃描IP代理,所以上面的東西完全滿足我們需求了。下麵我就簡單來介紹一下上面的可以控制併發的Pool的流程,方便需要的讀者可以參考一下:
但是由於我們的工具是自己使用,或者說是實驗用,就不考慮Treading/eventlet/stackless的選擇問題了,也不烦乱給這個小工具命名的問題。(雖然我覺得這樣說是有一些不負責任的)
Treading/eventlet/stackless
獲取與解析目標
拿到目標地址,我們既然是掃描肯定是批量掃描,那麼我們肯定是要批量獲取IP地址。我們說的獲取IP地址當然不是DNS査詢,我們說的當然是我們需要怎麼處理很多個IP地址:用戶指定一個範圍;或者僅僅是隨機掃描特定區域的網段。想想,大多也就這樣了,不會有新的途徑了吧。然後IP地址有了,埠呢?我們怎麼樣選擇掃描的埠?首先,我們肯定不能掃描全部的埠,因為這樣速度太慢了,而且比如3306 5432 111 22 25這些埠顯然不可能是一個http代理埠,我們就沒必要對這些埠進行檢查,根據經驗來說80-908080-8090這些埠是一個http代理的可能行比較大,再縮小一點,808080812331289000。就先這麼多吧!那麼我們獲取和解析目標的思路就有了:獲取一堆IP地址,然後用這一堆IP地址與想要檢查的埠進行組合,然後把組合結果分配給我們上面剛剛完成的併發框架,然後收取結果。好的,思路我們清楚之後,就需要開始動手完成這個部分了,沒錯,首先就是編寫測試:
class ParseTargetTest(unittest.case.TestCase):
"""Test Parse Target"""
#--------------------------------------------------------------
def runTest(self):
IPs_1 = '123.1.2.0/24'
IPs_2 = '123.1.2.3-123.1.3.5'
ip_gen = generate_target(IPs_1, PORTS)
self.assertIsInstance(ip_gen, iter)
for i in ip_gen:
ip_port = i.split(':')
try:
IPy.IP(ip_port[0])
except ValueError:
pass
ip_gen = generate_target(IPs_2, PORTS)
self.assertIsInstance(ip_gen, iter)
for i in ip_gen:
ip_port = i.split(':')
try:
IPy.IP(ip_port[0])
except ValueError:
pass
這樣的測試如果跑通的話,我們只需要通過一個generate_target這個函數,就可以解析IP地址了,可以解析IP地址段,也可以解析單個IP地址,返回值是通過yield產生generator。根據這樣的測試用例我們使用IPy模塊,進行基本的IP運算(IP地址網絡運算),我們很容易就可以解决上面的問題:
import unittest
import re
import IPy
from IPy import IP
PORTS = ['80', '8080', '8123', '3128', '82']
#----------------------------------------------------------------------
def generate_target(IPs, ports):
"""Generate the target for scanning HTTP proxy
Returns:
A Generator: """
gen = None
if '-' in IPs:
pairs = IPs.split('-')
start = pairs[0]
end = pairs[1]
gen = int2ips(IP(start).int(), IP(end).int())
else:
gen = IP(IPs)
for i in gen:
for port in ports:
yield ':'.join([i.__str__(), port])
#----------------------------------------------------------------------
def int2ips(start, end):
""""""
for i in xrange(int(start), int(end)):
yield IPy.intToIp(i, version=4)
上面的程式碼,看起來甚至還要比測試程式碼還要短,其實這就是TDD的魅力之一啊,編寫出最簡潔最少的滿足功能的程式碼。
架構探討與綜合功能
現在我們的基本功能都完成了,我們現在應該做什麼呢?先想一下我們這個功能的具體流程:
生成目標
多執行緒掃描
收集結果
大致是這個樣子,但是我們仔細想一想,好像還需要一點別的東西:
用戶互動(命令列工具or Web介面工具?)
工具架構以及擴展性(萬一我以後除了掃描代理,還想掃描別的東西呢?)
萬幸的是,我們完成的功能是分離的,那麼我們現在需要考慮的就是怎麼把功能拼接起來。我們一開始定位是一個代理掃描器,但是現在,我們如果改動一下檢查代理(proxy_check)模塊,如果改成針對某個埠的EXP利用的話,這個工具顯然也是可以滿足針對某個埠(或者針對某項服務)的掃描-探測-利用的。換一句話來說,我們現在就想實現proxy_check的挿件化,那麼,問題就來了,現在想要實現我們為了實現挿件化解耦(當然我們面對的情景很簡單),就需要考慮一下如何挿件化?萬幸的是Python提供了一個pluginbase的模塊,但是我們並不打算使用這一個模塊,就編寫一個挿件基類,然後讓所有的挿件去繼承這個基類,然後我們在工具中調用挿件基類的介面就可以,這樣就可以實現調用多種類型的功能模組了,現在也就只需要所有的功能模組都繼承挿件並且按照一定規範完成功能編寫。這樣來想的話,我們的架構也就非常明朗了:
那麼我們發現如果怕這樣設計的話,顯然我們需要重新編寫基類,然後稍微改造一下原來的功能模組。其實是非常簡單的,我們的挿件其實只需要傳入參數->執行->處理輸出就可以了,再加上基本的内容,什麼挿件名稱,挿件描述,就可以完成了不是麼?
class PluginBaseClass(object):
""""""
__mateclass__ = abc.ABCMeta
#----------------------------------------------------------------------
def __init__(self):
"""Constructor"""
pass
#----------------------------------------------------------------------
@abc.abstractproperty
def name(self):
""""""
return '...'
#----------------------------------------------------------------------
@abc.abstractproperty
def description(self):
""""""
return '...'
#----------------------------------------------------------------------
@abc.abstractmethod
def _work(self, *args, **kwargs):
""""""
return '...'
#----------------------------------------------------------------------
@abc.abstractmethod
def _parse_result(self, result):
""""""
return "..."
#----------------------------------------------------------------------
def start(self, *args, **kwargs):
""""""
#print 'cl'
ret = self._work(*args, **kwargs)
#SOMETHING IF NEEDED
return self._parse_result(ret)
這樣我們定義了挿件的基類,繼承這個挿件,必須複寫的特性和方法有:
name内容
description内容
_work方法
_parse_result方法
這樣滿足了最基本挿件的要求了吧:至少要有個名字,有個簡單描述,有功能函數,結果解析函數。(關於結果解析的必要性呢?我們考慮如果以後需要添加更多的不同類型的功能挿件的話,結果肯定不可能是完全統一的,囙此我們需要一個解析結果的函數幫我們完成一些工作)。那麼我們主函數負責任務發佈,只需要調用所有挿件的start函數傳入任意的參數就可以完成操作了(但是參數的處理我們肯定要先在程式中完成的對吧?)。
重構功能函數!
當然我們要把功能函數改造成(或者再次編寫)一個挿件才能丟給程式執行。怎麼改造呢?上一節講過的,就是繼承挿件,然後複寫該複寫的方法和内容就可以完成了。當然,我們動手肯定應該先去寫測試方法對不對?先有測試後有程式碼。值得一說的是,這次在重寫測試方法的時候就有點意思了,我們可以把自己暫且當成主程序,主程序來調用這個挿件,應該怎麼樣調用呢?
def test_plugin_ins(self):
''''''
pprint('Plugin Test')
ret = CheckProxyPlugin()
_result = ret.start('45.78.5.48:333')
for _ in _result:
pprint(ret.name)
pprint(ret.description)
pprint(_)
大概這就我們理想的樣子吧:介面簡單完備,完成的功能什麼大家心裡有數,那麼挿件究竟是什麼樣子的呢?其實非常簡單。並沒有直接去複製程式碼,而是採用調用原功能的方法進行簡單構造:
class CheckProxyPlugin(PluginBaseClass):
""""""
_name = 'Check Proxy'
_description = 'Check if the specific addr(IP:PORT) is a \
HTTP(s) proxy'
#----------------------------------------------------------------------
@property
def name(self):
"""getter for self._name"""
return self._name
#----------------------------------------------------------------------
@property
def description(self):
"""getter for self._description"""
return self._description
#----------------------------------------------------------------------
def _work(self, *args, **kwargs):
""""""
ret = CheckProxy(args[0])
return ret.test(timeout=int(kwargs['timeout']) if kwargs.has_key('timeout')
else 3)
#----------------------------------------------------------------------
def _parse_result(self, result):
""""""
return result
其實這樣就完成了這個挿件了,然後我們對這個挿件的調用只有獲取挿件實例,調用方法,等結果了。
其他的一些小工作
(╯`□′)╯(┻━┻現在基本該有的東西都有了而且,接下來就來做一些“填充”之類的東西了:
解析命令列(互動編寫)
主程序調用
結果處理
互動
這裡既然是一個實驗小程式,我們就簡單寫一個命令列解析吧,之前提到的採用argparser模塊。對於接下來收尾的小東西,TDD當然也適用,但是為了解决時間,我們就簡單過一下吧,剩下的都不是重點了。
#----------------------------------------------------------------------
def parse_args():
""""""
parser = argparse.ArgumentParser(description='Pr0xy Scan and collect proxy info')
parser.add_argument('IP',
help='''The IP you want to check: \nINPUT format:\ 1.2.3.4-1.2.5.6 or 45.67.89.0/24 or
45.78.1.48''')
parser.add_argument('--ports', dest='ports',
help='The ports you want to check, Plz input single port\
or with a format like: 82,80 or 100-105')
parser.add_argument('--type', dest='type',
help='Type of scan [Now Just Support proxy]')
args = parser.parse_args()
args.IP
#print args.ports
#pprint(args)
PORTS = []
raw_ports = args.ports
if '-' in raw_ports:
_ = raw_ports.split('-')
_min = _[0]
_max = _[1]
PORTS = range(_min, _max)
elif ',' in raw_ports:
_list = raw_ports.split(',')
for _ in _list:
if _.isdigit():
PORTS.append(int(_))
else:
pass
#PORTS.append(object)
if PORTS == []:
PORTS = parse_target.PORTS
else:
if raw_ports.isdigit():
PORTS.append(int(raw_ports))
else:
PORTS = parse_target.PORTS
#pprint(args.IP)
#pprint(PORTS)
return (args.IP, PORTS, args.type)
這裡還是有必要解釋一下參數:
- IP就表示IP地址,你可以輸入單個或者多個(必須是點分十進位),當然形式限制三種:1.單個IP地址2.短橫線連接符表示的IP範圍(例如1.2.3.4-1.2.3.9這表示6個IP)3.帶有遮罩的網絡範圍(例如1.2.3.0 /24)
- 1.單個IP地址
- 2.短橫線連接符表示的IP範圍(例如1.2.3.4-1.2.3.9這表示6個IP)
- 3.帶有遮罩的網絡範圍(例如1.2.3.0/24)
- 埠(略)
- 類型:這裡的類型還是有必要解釋的,萬一以後需要添加其他的功能,就可以直接改動一張錶添加新的模塊,然後通過指定類型調用。
通過type調用挿件
定義一張錶,這張錶紀錄挿件的地址以及挿件的類名稱
SCAN_TYPE_TABLE = {
'proxy': {'module':'lib.check_proxy',
'plugin_class':'CheckProxyPlugin'}
}
然後動態導入特定挿件:
def get_pluginclass(type_name):
""""""
try:
try:
plugin_class_path = SCAN_TYPE_TABLE[type_name]
except KeyError:
pprint('[!] No Such Type')
exit()
module_name = plugin_class_path['module']
plugin_class_name = plugin_class_path['plugin_class']
module_tmp = __import__(module_name, fromlist=plugin_class_name)
ret = getattr(module_tmp, plugin_class_name)
return ret
except Exception, E:
traceback.format_exc()
exit()
導入之後就可以通過統一的介面去調用我們的挿件了:
def main():
"""Pr0xy main"""
params = parse_args()
IP = params[0]
PORTS = params[1]
pool = Pool()
plugin_class = get_pluginclass(params[2])
for target in parse_target.generate_target(IP, PORTS):
_ = plugin_class(target)
pool.add_task(_.start)
result_queue = pool.run()
while True:
try:
ret = result_queue.get(timeout=0.3)
# 这里出现了一个处理结果的函数
process_result(ret)
except Empty:
pass
except KeyboardInterrupt:
pprint('ByeByeBye')
收工總結:
當然我承認這個工具實在是太簡單了!到現在,這個工具已經可以拿出去用了,而且,你如果覺得僅僅是掃描代理有點太low了吧!那麼我們這一節探討了架構與拓展方面的東西,我相信如果想把它拓展一下做成一個全網掃描特定埠,自動打EXP的挿件真的是相當容易對不對?也就是照著上面的挿件的樣子寫一個挿件,然後填一下挿件錶,就可以直接去用了。這算是對上一篇文章的一個比較好的交代了吧。其實作為一個並不是職業開發的WEB狗,寫安全工具應該算是一個不大不小的挑戰了吧,我也是啊,一致夢想有一套專屬的自己開發的滲透集成平臺,最近在大家的幫助下經過了無數次重構,終於完成了框架,已經填了很多個挿件自己在使用,下一步準備嘗試部署在多個服務器上形成集羣,我相信再有一段時間,自己的夢想就實現了~筆者水准實在是不高,而且也不是專業開發,如果程式碼寫的辣眼睛,也請有需要的讀者多多包涵。
GITHUB地址:https://github.com/VillanCh/pr0xy
*本文原創作者:VillanCh,本文屬FreeBuf原創獎勵計畫,未經許可禁止轉載