據對于任何一個企業來說都是非常重要的,為了保證數據 ETL 流程的質量及效率,很多公司都會引入 ETL 工具。目前 ETL 工具有很多,但是針對 ETL 測試的測試工具在業界卻比較少見。這是為什么呢?
主要是因為在日常 ETL 測試過程中會遇到很多問題,特別是 Hive SQL 類測試的問題:
(1)測試以手動測試為主,缺少自動化工具;
(2)缺少與數據質量相關的分析工具;
(3)測試中需要重復編寫SQL語句,效率較低;
(4)運行SQL語句耗時太長,嚴重拖慢測試進度;
(5)Shell窗口中的查詢結果不易保存,HUE的查詢結果易過期且需要手動操作保存;
(6)數據同步場景及ETL場景下,需要對比源表和目標表一致性,缺少對比工具;
(7)實時數據處理場景對數據時效性要求高,測試時場景難以模擬,問題難以復現;
(8)常用測試場景下的用例重復,例如,對拉鏈表測試、MapReduce腳本的測試缺少通用的測試覆蓋用例;
(9)缺少Hive與HBase一致性對比工具。
總的來說,大數據測試存在門檻高、測試效率較低、測試覆蓋不全、測試場景不易復現、 測試問題難以定位等問題,今天異步君就給大家介紹一款可以解決上述問題的超好用大數據測試工具——easy_data_test。
easy_data_test
easy_data_test 是用Python編寫的,目前它的主要功能有:
(1)支持單表數據量、列空值數據量、列非空值數據量、列最大值、列最小值、列不同值、不同值數據量查詢,支持對表結構、任意 select 語句的查詢,支持表基本信息查詢、值域分析、異常值分析、手機號合規性分析、ID 合規性分析。
(2)支持雙表數據量對比、列空值數據量對比、列非空值數據量對比、表結構對比、Hive 雙表一致性對比、Hive 與 HBase 一致性對比。
(3)支持查看主備集群及庫切換、庫表集群信息。
(4)支持實時查看歷史執行命令及結果,以 HTML 頁面展示全表分析,以 HTML 頁面展示值域,以 HTML 頁面展示 Hive 雙表一致性分析結果。
(5)支持拉鏈表通用測試(判斷拉鏈表是否斷鏈,判斷拉鏈表日期正確性,對比拉鏈表與臨時表數據量、數值)
easy_data_test功能如此強大,是如何實現的呢?異步君拿到了獨家資料,從這個工具的模塊設計到技術選型、再到代碼實現通通都有,干貨滿滿!下面就讓我們來詳細看看吧。
模塊設計
話不多說,直接上圖:
easy_data_test 模塊設計
如圖所示,用戶運行 easy_data_test 工具后,可以通過 ./easy_data_test --help 命令查看所有非交互式命令,使用 stdin.readline() 來獲取用戶輸入的語句。
如果沒有指定 -f 或者 -e 就會進入交互式命令行模式。進入交互式模式后,程序通過 raw_input 函數獲取用戶輸入的命令,并根據命令的首個關鍵字執行對應的函數。函數中封裝了一條或多條 SQL 語句,通過 Presto 讀取 Hive 元數據,或通過 pyHive 的 Hive 模塊連接 Hive。
部分執行結果展示在終端頁面,并存儲在查詢歷史命令及結果文件中。部分命令執行完畢后會生成 url,通過瀏覽器可以查看相應命令的執行結果。
不同的首個關鍵字對應不同的功能模塊,通常每個功能模塊包含多個執行函數。
技術選型
業內常用的 Python 連接 Hive 的工具有 Presto、pyHive、impala 及 pyhs2 等。設計人員在經過執行效率及公司現有環境綜合比較后,最終選擇了 Presto 作為查詢主要工具。
Presto 是由 Facebook 公司開發的、一個運行在多臺服務器上的分布式查詢引擎。本身雖然并不存儲數據,但是可以接入多種數據源(Hive、HBase、Oracle、MySQL、Kafka、Redis 等),并且支持跨數據源的級聯查詢。
Presto 所使用的執行模式與 Hive 有根本的不同,大部分場景下 Presto 比 Hive 快一個數量級。Presto 接受請求后,立即執行,全內存并行計算;Hive 需要用 Yarn 做資源調度,為了接受查詢,需要先申請資源,啟動進程,并且采用 MapReduce 計算模型,中間結果會保存在磁盤上,所以速度就相對較慢。
使用 easy_data_test 過程中,有時會發現 Presto 存在部分 HiveQL 不兼容問題,例如,show tables like a* 命令無法執行,表結構查詢與預期不符,執行切換庫操作報錯時不拋出異常等。
考慮到 Presto 部分功能缺失帶來的問題,于是設計人員選擇 pyHive 作為功能彌補工具,在執行特定 SQL 語句時會切換到 pyHive 去連接 Hive 執行。
區別于 Hive,需要格外注意的是,Presto 不支持隱式轉換。例如,Hive 會成功執行以下語句:
select count(1) from sample_label where label <> ";
但是使用 Presto 執行就會報告以下錯誤;
PrestoUserError(type=USER_ERROR, name=SYNTAX_ERROR, message="line 1:83: '<>' cannot be applied to integer, varchar(0)", query_id=20191106_024551_ 01370_8ukjc)
報錯原因是,label 列定義的類型為 integer,在使用 Presto 時直接將該列與空字符做比較,Presto 不支持隱式轉換。對于該類問題,使用時只需將 label 顯式轉換為 string 或者 varchar 類型即可解決。
select count(1) from sample_label where cast(label as string) <> ";
從以上內容已經不難看出研發人員的匠心,最后我們直接來看一看 easy_data_test 的模塊代碼。
模塊代碼
入口函數如下:
1 def main(options, hostname, port):
2 setup_cqlruleset(options.cqlmodule)
3 setup_cqldocs(options.cqlmodule)
4 # 初始化歷史執行命令及結果文件
5 init_history()
6 if options.file is None:
7 stdin = None
8 else:
9 try:
10 encoding, bom_size=get_file_encoding_bomsize(options.file)
11 stdin = codecs.open(options.file, 'r', encoding)
12 stdin.seek(bom_size)
13 except IOError, e:
14 sys.exit("Can't open %r: %s" % (options.file, e))
15
16 try:
17 # 初始化Shell,該類繼承自cmd.Cmd
18 shell=Shell(hostname,
19 port,
20 database=options.database,
21 username=options.username,
22 password=options.password,
23 stdin=stdin,
24 tty=options.tty,
25 completekey=options.completekey,
26 single_statement=options.execute,
27 connect_timeout=DEFAULT_CONNECT_TIMEOUT_SECONDS)
28 except KeyboardInterrupt:
29 sys.exit('Connection aborted.')
30 except Exception, e:
31 sys.exit('Connection error: %s' % (e,))
32 if options.debug:
33 shell.debug = True
34
35 # 通過交互式命令循環處理
36 shell.cmdloop()
37 batch_mode = options.file or options.execute
38 if batch_mode and shell.statement_error:
39 sys.exit(2)
40
41
42 if __name__ == '__main__':
43 main(*read_options(sys.argv[1:], os.environ))
通過 Presto 連接 Hive 的代碼如下:
1 import prestodb
2 conn=prestodb.dbapi.connect(
3 host= ip,
4 port=8443,
5 user='username',
6 catalog='hive',
7 schema='default',
8 http_scheme='https',
9 auth=prestodb.auth.BasicAuthentication("username", "username的密碼"),
10 )
11 conn._http_session.verify = './presto.pem' #身份認證相關文件
12 cur = conn.cursor()
13 cur.execute('SELECT * FROM system.runtime.nodes')
14 rows=cur.fetchall()
15 print rows
為了使用 Hive 查詢全表數據量,需要執行 SQL 語句 select count(*) from tablename。使用工具代碼封裝后,查詢表數據只需要使用 count tablename 即可實現,且查詢效率比使用原生 Hive 快一個數量級。查詢結果保存在歷史文件中,可以使用相關命令查看。
關于單表模塊的命令有多個,count 命令的代碼如下:
1 class SigleTableAnalysis(cmd.Cmd):
2 # count table,查詢表數據量,支持傳入where條件
3 @classmethod
4 def do_count(self, parsed, print_command=True, print_res=True):
5 try:
6 table_name=parsed.split(' ')[1].strip(';')
7 statement = 'select count(1) from %s' % table_name
8 if len(parsed.split(' ')) >=3 and parsed.split(' ')[2].strip() == 'where': 9 wherecondition = ' '.join(parsed.split(' ')[3:])
10 statement=statement + ' where ' + wherecondition
11 status, res = perform_simple_statement(statement, detail=False, print_
command=print_command, print_res=print_res)
12 if not print_res:
13 return status, res
14 except IndexError as e:
15 print('please check whether your command is right')
16 except Exception as e:
17 import traceback
18 print('%s detail: %s' % (str(e), traceback.format_exc()))
其他模塊的代碼與 count 命令的代碼相似,雙表查詢模塊、拉鏈表測試模塊、數據質量分析模塊會在單表模塊的基礎上進行封裝,所以設計會更復雜一些,由于篇幅有限,異步君沒法在這里為大家更多地展示了。想要深入了解的小伙伴,推薦閱讀《機器學習測試入門與實踐》。
機器學習測試入門與實踐
作者:艾輝
內容簡介:
本書全面且系統地介紹了機器學習測試技術與質量體系建設,能夠幫助讀者了解機器學習是如何工作的,了解機器學習的質量保障是如何進行的。
工程開發人員和測試工程師通過閱讀本書,可以系統化地了解大數據測試、特征測試及模型評估等知識;算法工程師通過閱讀本書,可以學習模型評測的方法和拓寬模型工程實踐的思路;技術專家和技術管理者通過閱讀本書,可以了解機器學習質量保障與工程效能的建設方案。
們在瀏覽器中看到的畫面優美的界面,其實是由瀏覽器,對網頁內容做了解釋后,才呈現出來的類似word的WYSIWYG
實質它是一段HTML代碼,加JS、CSS等。如果把網頁比作一個人,那么HTML便是他的骨架,JS便是他的肌肉,CSS便是它的衣服。最重要的部分是存在于HTML中的。
用一個例子來爬網頁
?import urllib2
?response=urllib2.urlopen("http://www.baidu.com")
?print response.read()
這個例子中,真正的程序就兩行,把它保存成demo.py,進入該文件的目錄,執行如下命令查看運行結果。
?python demo.py
可以看到這個網頁的源碼已經被爬下來了。
網頁爬取方法
那么我們來分析這幾行Python代碼:
?import urllib2
?response=urllib2.urlopen("http://www.baidu.com")
?print response.read()
第2行
?response=urllib2.urlopen("http://www.baidu.com")
?首先我們調用的是urllib2庫里面的urlopen方法,傳入一個URL
?這個網址是百度首頁,協議是HTTP協議
?當然你也可以把HTTP換做FTP,FILE,HTTPS 等等,只是代表了一種訪問控制協議
三個參數
urlopen一般接受三個參數,它的參數如下:
?urlopen(url, data, timeout)
?第一個參數url即為URL,第二個參數data是訪問URL時要傳送的數據,第三個timeout是設置超時時間。
?第一個參數URL是必須要傳送的,本例里傳送了百度的URL
?第二三個參數可以不傳,
?data默認為空None
?timeout默認為socket._GLOBAL_DEFAULT_TIMEOUT
執行urlopen方法之后,返回一個response對象,返回信息便保存在這里面。
?print response.read()
?response對象有一個read方法,可以返回獲取到的網頁內容。
第3行
?print response.read()
?如果不加read直接打印會是什么?
?直接打印出了該對象的描述
?<addinfourlat 139728495260376 whose fp=<socket._fileobjectobject at 0x7f1513fb3ad0>>
?所以一定要加read方法
構造Requset
其實上面的urlopen參數,可以傳入一個request請求
它其實就是一個Request類的實例,構造時需要傳入Url,Data等等的內容。
比如上面的三行代碼,我們可以這么改寫
?import urllib2
?request=urllib2.Request("http://www.baidu.com")
?response=urllib2.urlopen(request)
?print response.read()
運行結果是完全一樣的,只不過中間多了一個request對象
?推薦大家這么寫,因為在構建請求時還需要加入好多內容
?通過構建一個request,服務器響應請求得到應答,這樣顯得邏輯上清晰明確。
import urllib.parse # Python3 url編碼 print(urllib.parse.quote("天安門")) # Python3 url解碼 print(urllib.parse.unquote("%E5%A4%A9%E5%AE%89%E9%97%A8"))
my_web.py(修改)
*請認真填寫需求信息,我們會在24小時內與您取得聯系。