• <menu id="sssag"></menu>
  • <menu id="sssag"></menu>
  • 一次業務代碼的流式重構

    封面圖

    業務場景

    目標 ~> 調度 ~> MQ ~> 引擎,就是生產者消費者模型,非常簡單。

    為了提高性能,調度需要將一個大目標拆分為多個子任務,啟動多個引擎并發地去執行。

    舉個例子,用戶輸入一個 A 段目標 1.0.0.0/8(2^24=16,777,216),設置了全端口(1-65535)三種協議(ICMP UDP TCP)掃描,假定引擎每次處理 10W 目標,200 個端口時效率最佳。

    老代碼

    SpiltTargets ~> SpiltPorts ~> SpiltProtocol ~> MQ,代碼抽象為三個函數,順序執行,每個階段執行完才能進入下個階段,中間產生的所有數據都保存在內存中,然后全部推送到 MQ。

    SpiltTargets 后,子任務數量變為 16,777,216 / 100,000 = 168

    接著 SpiltPorts 后,65535 / 200 = 328,此時子任務數量變為 168 * 328 = 55104

    最終 SpiltProtocol,子任務數量 55104 * 2 + 168(ICMP 協議無端口)= 110376,高達 11W 之多

    優點

    • 代碼實現簡單
    • 純 CPU 運算,整個拆分過程快,由 MQ 持久化消息,不擔心重啟丟數據(不過不能在拆分的時候重啟)

    缺點

    • 調度內存占用高(一行字符串最終變為 11W 行字符串)
    • MQ 消息數量太多,內存占用大的同時,還可能丟消息

    后續

    其實按照 10W 目標,200 個端口拆分,整個系統還算撐得住,直到后來我們的系統把客戶的路由器給打掛了(看來有時候不能一味的追求快)。

    為了掃描變慢點,拆分粒度改為了 256 個目標,50 個端口,最終產生消息數 65535 * 1311 * 2 + 65535 = 171,898,305,都上億了,調度和 MQ 都頂不住了!

    當時的修改是引入二級隊列,一級還是按照 10W 拆分,后臺協程定時從一級獲取消息按照 256 拆分為二級,引擎從二級隊列獲取子任務。

    流式重構

    雖然上面的二級隊列解決了問題,但是我感覺并不是很完美,為什么要等到所有的流程都走完才推消息呢?為什么要先推消息,然后拉回來,再推出去呢?

    受到 go-zero/stream 啟發,我決定將其流式化重構,去除業務代碼,核心的骨架如下。

    type Stream struct {
    	source <-chan []string // 一批目標
    	done   chan struct{}   // 退出信號
    }
    
    func NewStream(targets []string) Stream {
        // 此處使用無緩沖的 channel 演示,具體可以根據上下游的處理能力設置 buffer
    	source := make(chan []string) 
    	done := make(chan struct{})
    
    	go func() {
    		defer close(source)
    
    		for _, v := range targets {
    			select {
    			case <-done: // 監聽退出信號
    				return
    			default:
    			}
    			source <- []string{v} // 傳遞給下一階段
    		}
    	}()
    
    	return Stream{
    		source: source,
    		done:   done,
    	}
    }
    
    func (s Stream) SpiltTargets(chunk int) Stream {
    	source := make(chan []string)
    
    	var buf []string
    
    	go func() {
    		defer close(source)
    
    		for msg := range s.source {
    			select {
    			case <-s.done:
    				return
    			default:
    			}
    
    			// 緩存 chunk 數量的目標后,傳遞給下一階段,算法很簡單,此處忽略
    			for _, v := range msg {
    				buf = append(buf, v)
    			}
    			source <- buf
    		}
    	}()
    
    	return Stream{
    		source: source,
    		done:   s.done,
    	}
    }
    
    func (s Stream) SpiltPorts(chunk int) Stream {
    	// 邏輯和 SpiltTargets 一致,只不過對端口做處理
    }
    
    func (s Stream) PushMQ(protocol []string) Stream {
    	// 邏輯基本和上面一致
    
    	// 有個策略,只有在當前隊列消息數少于 500 時,才推送
    	// 不能一股腦全推送,否則就和老代碼效果一樣了(拆分速度遠遠快于消費速度)
    }
    
    func (s Stream) Wait() {
    	// 等待所有的子任務都拆分完成
    	for range s.source {
    	}
    
    	// 關閉 MQ 連接
    }
    
    func (s Stream) Tidy() {
    	// 通知所有階段都退出
    	close(s.done)
    
    	// 刪除隊列
    
    	// 關閉 MQ 連接
    }
    

    使用效果如下:

    func main() {
    	s := NewStream([]string{"1.0.0.0/8"})
    	s.SpiltTargets(10000).PushMQ("icmp").SpiltPorts(200).PushMQ("udp", "tcp").Wait()
    }
    

    代碼效果看起來還不錯,就像水一樣徐徐流過,而不像之前水庫泄洪似的。

    優點

    • 不用擔心拆分粒度,省內存,MQ 消息數可控
    • 方便拓展,根據業務需求可以加入更多的處理階段

    缺點

    • 整個拆分過程伴隨著任務運行一直存在,不能利用 MQ 持久化
    • 只能處理局部數據,不能處理全量數據

    后續

    由于持久化方案太復雜,目前暫時沒做,不過問題不大,重啟這種非正常情況畢竟機率非常小

    總結

    Go 的 channel 非常適合做流式處理。

    在設計時不僅僅要完成功能,還要適當考慮性能,雖然這樣花費的時間可能稍微多點。

    參考

    https://github.com/kevwan/stream

    posted @ 2022-03-12 21:34  YahuiAn  閱讀(18)  評論(0編輯  收藏  舉報
    国产在线码观看超清无码视频,人妻精品动漫H无码,十大看黄台高清视频,国产在线无码视频一区二区三区,国产男女乱婬真视频免费,免费看女人的隐私超爽,狠狠色狠狠色综合久久蜜芽