1、概述
在工業(yè)物聯(lián)網(wǎng)系統(tǒng)中通常會(huì)由設(shè)備接入來針對某個(gè)區(qū)域內(nèi)的監(jiān)測完成智能化,設(shè)備在接入工業(yè)物聯(lián)網(wǎng)統(tǒng)一接入系統(tǒng)之后會(huì)有攜帶的測點(diǎn)數(shù)據(jù)源源不斷的上報(bào)實(shí)時(shí)監(jiān)測數(shù)據(jù)到工業(yè)物聯(lián)網(wǎng)統(tǒng)一接入系統(tǒng),在某些場景下部分設(shè)備在上報(bào)數(shù)據(jù)之后還需要實(shí)時(shí)對其做預(yù)處理后存儲(chǔ)。在處理這些流數(shù)據(jù)的解決方案中,通常會(huì)用到Kafka Stream 、Flink、Spark Stream等流處理框架和技術(shù),但是這些流處理框架和技術(shù)同時(shí)也帶來了高昂的開發(fā)和運(yùn)維成本。這次給大家分享的是一款開源的時(shí)序庫TDengine,它自帶的流計(jì)算引擎提供了實(shí)時(shí)處理寫入的數(shù)據(jù)流的能力,使用 SQL 定義實(shí)時(shí)流變換,當(dāng)數(shù)據(jù)被寫入流的源表后,數(shù)據(jù)會(huì)被以定義的方式自動(dòng)處理,并根據(jù)定義的觸發(fā)模式向目的表推送結(jié)果。
2、流計(jì)算介紹
在正式開始介紹流計(jì)算的具體使用方法前先介紹一下流計(jì)算使用過程中可能會(huì)用到的數(shù)據(jù)切分查詢和窗口切分查詢與流計(jì)算創(chuàng)建方式。
2.1、數(shù)據(jù)切分查詢
當(dāng)需要按一定的維度對數(shù)據(jù)進(jìn)行切分查詢,然后在切分出來的數(shù)據(jù)集中做后續(xù)計(jì)算時(shí)使用的數(shù)據(jù)切分子句。
partition by part_list;
語法介紹
1)part_list可以是某列、某個(gè)常量、某個(gè)標(biāo)量函數(shù)和他們的組合;
2)partition by位于where之后;
3)partition by可以和窗口切分子句或group by一起使用。
舉個(gè)例子,將t表的數(shù)據(jù)按照c1列進(jìn)行分組,計(jì)算每個(gè)分組內(nèi)c2的和:
select c1,sum(c2) from t1 partition by c1;
簡單了解了數(shù)據(jù)切分查詢之后,再介紹一下窗口切分查詢。
2.2、窗口切分查詢
當(dāng)需要按照窗口切分方式進(jìn)行聚合結(jié)果查詢的時(shí)候可以使用窗口切分子句。
window_clause: {
SESSION(ts_col, tol_val)
| STATE_WINDOW(col)
|
INTERVAL(interval_val [, interval_offset]) [SLIDING (sliding_val)]
[FILL(fill_mod_and_val)]
| EVENT_WINDOW START WITH start_trigger_condition END
WITH end_trigger_condition
| COUNT_WINDOW(count_val[, sliding_val])
???????}
語法介紹
1)窗口子句位于數(shù)據(jù)切分子句之后,不能和group by子句一起使用;
2)窗口子句將數(shù)據(jù)按窗口進(jìn)行切分,對每個(gè)窗口進(jìn)行查詢列表中的表達(dá)式計(jì)算,查詢列表中的表達(dá)式只能包含:
常量;
窗口開始時(shí)間(_wstart)、窗口結(jié)束時(shí)間(_wend)、窗口持續(xù)時(shí)間(_wduration)三個(gè)偽列;
至少包含一個(gè)聚集函數(shù);
包含上面表達(dá)式的表達(dá)式。
3)窗口子句中的where語句可以指定查詢的起止時(shí)間和其他過濾條件。
窗口的種類有五種,分別為:
(1)SESSION:會(huì)話窗口;
話窗口根據(jù)記錄的時(shí)間戳主鍵的值來確定是否屬于同一個(gè)會(huì)話。
1):ts_col的值代表時(shí)間戳主鍵的列名;
2):tol_val的值代表時(shí)間間隔范圍。
舉個(gè)例子:
SELECT _wstart,_wduration,count(*) from t1 session (ts,2s);
(2)STATE_WINDOW:狀態(tài)窗口;
使用整數(shù)、布爾值或字符串來標(biāo)識(shí)產(chǎn)生記錄時(shí)候設(shè)備的狀態(tài)量。產(chǎn)生的記錄如果具有相同的狀態(tài)量數(shù)值則歸屬于同一個(gè)狀態(tài)窗口,數(shù)值改變后該窗口關(guān)閉。
select _wstart,count(*),c1 from t state_window(c1);
(3)INTERVAL:時(shí)間窗口;
時(shí)間窗口又分為了滑動(dòng)時(shí)間窗口和翻轉(zhuǎn)時(shí)間窗口;
1):interval_val的值代表具體的時(shí)間間隔;
2):interval_offset的值代表窗口的偏移量,必須小于interval_val的值;
3):sliding_val的值代表窗口滑動(dòng)的時(shí)間間隔。
當(dāng)interval_val的值和sliding_val值相等的時(shí)候滑動(dòng)時(shí)間窗口也就是翻轉(zhuǎn)時(shí)間窗口。
select _wstart, c1, sum(c2) from t partition by c1 interval(10s);
(4)EVENT_WINDOW:事件窗口;
事件窗口根據(jù)開始條件和結(jié)束條件來判斷窗口,當(dāng)start_trigger_condition滿足時(shí)則窗口開始,當(dāng)end_trigger_condition滿足時(shí)窗口關(guān)閉,事件窗口無法關(guān)閉時(shí),數(shù)據(jù)不會(huì)被輸出。
select _wstart, _wend, count(*) from t event_window start with c2 > 1
??????? end with c2 < 3;
(5)COUNT_WINDOW:計(jì)數(shù)窗口;
計(jì)數(shù)窗口按照固定的數(shù)據(jù)量來劃分窗口,默認(rèn)將數(shù)據(jù)按時(shí)間戳排序。
select _wstart,_wend,count(*) from t count_window(3);
2.3、流計(jì)算創(chuàng)建方式
CREATE STREAM [IF NOT EXISTS] stream_name [stream_options] INTO
stb_name[(field1_name, field2_name [PRIMARY KEY], ...)] [TAGS (create_definition
[, create_definition] ...)] SUBTABLE(expression) AS subquery
stream_options: {
TRIGGER [AT_ONCE | WINDOW_CLOSE | MAX_DELAY time] WATERMARK time
IGNORE EXPIRED
[0|1]
DELETE_MARK
time FILL_HISTORY [0|1]
IGNORE UPDATE [0|1]}
流計(jì)算觸發(fā)模式
流計(jì)算觸發(fā)模式可以通過TRIGGER指令指定,對于非窗口計(jì)算,流計(jì)算的觸發(fā)是實(shí)時(shí)的;對于窗口計(jì)算,有3種觸發(fā)模式,
1)AT_ONCE:寫入立即觸發(fā);
2)WINDOW_CLOSE:窗口關(guān)閉時(shí)觸發(fā);
3)MAX_DELAY time:若窗口關(guān)閉則觸發(fā)計(jì)算,若窗口未關(guān)閉且時(shí)長超過time指定的時(shí)間則觸發(fā)計(jì)算。按照規(guī)定time的值不得小于5s。
流計(jì)算對于過期數(shù)據(jù)的處理策略
對于已關(guān)閉的窗口,再次落入該窗口中的數(shù)據(jù)被標(biāo)記為過期數(shù)據(jù)。對于過期數(shù)據(jù)的處理方式由 IGNORE EXPIRED 選項(xiàng)指定:
1)增量計(jì)算,即 IGNORE EXPIRED 0;
2)直接丟棄,即 IGNORE EXPIRED 1:默認(rèn)配置,忽略過期數(shù)據(jù)。
3、案例介紹
下面通過從表t_ds中統(tǒng)計(jì)設(shè)備的狀態(tài)持續(xù)時(shí)間的一個(gè)小例子介紹流
3.1計(jì)算的使用方法:
3.2、流計(jì)算創(chuàng)建
CREATE stream IF NOT EXISTS t_ds_s TRIGGER at_once ignore expired 0
fill_history 1 INTO t_ds_s_t subtable(concat('t_ds_s_t',d_id))AS SELECT
_wstart,_wend,_wduration,count(*) AS cnt,d_id FROM t_ds PARTITION BY d_id,tbname
??????? event_window START WITH status_text = 0 END WITH status_text = 1;
當(dāng)數(shù)據(jù)流入源表之后,數(shù)據(jù)會(huì)被以定義的方式自動(dòng)處理,并根據(jù)定義的觸發(fā)模式向目的表推送結(jié)果。
3.3、結(jié)果查詢
當(dāng)有數(shù)據(jù)流入到表t_ds中時(shí)符合流計(jì)算中定義的窗口數(shù)據(jù)會(huì)實(shí)時(shí)的統(tǒng)計(jì)到流計(jì)算創(chuàng)建的表t_ds_s_t中。查詢t_ds_s_t表,結(jié)果展示如下:
SELECT _wstart,_wend,_wduration,cnt,d_id FROM t_ds_s_t;
通過SQL得到已經(jīng)統(tǒng)計(jì)完成的數(shù)據(jù)之后就可以針對設(shè)備狀態(tài)變化做出一些靈活的展示,避免了再次引入其他流處理框架帶來的額外的開銷和成本。
4、尾聲
在工業(yè)物聯(lián)網(wǎng)應(yīng)用場景中,在處理時(shí)序數(shù)據(jù)且不需要做過多的復(fù)雜處理時(shí)通過TDengine的流計(jì)算功能是一個(gè)輕量級且高效的解決方案,可以在開發(fā)過程中占用更少資源以更合適的方式處理流數(shù)據(jù)。