侧边栏壁纸
博主头像
王一川博主等级

努力成为一个不会前端的全栈工程师

  • 累计撰写 70 篇文章
  • 累计创建 20 个标签
  • 累计收到 39 条评论

目 录CONTENT

文章目录

IBM MQ/MB 第五幕:HTTP

王一川
2021-11-01 / 0 评论 / 1 点赞 / 1,083 阅读 / 7,715 字
温馨提示:
本文最后更新于 2022-06-02,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

涉及 HTTP & MQ 结合的无非就两大应用,通过 HTTP 对外提供接口服务和基于 HTTP 的数据上报,这篇文章将实现这两个功能

一、HTTP 服务接口

需求:客户需要数据库某张表的今日实时数据,但是客户很菜不会写SQL,想要给一个 HTTP 接口查看即可,不要求什么花里胡哨的报表展示。

1.1 准备数据

create table TEST_HTTP_MQ
(
    FFID   VARCHAR2(10),
    ACID   VARCHAR2(10),
    FLNO   VARCHAR2(10),
    HBDATE DATE
);

INSERT INTO HCSS_AIRPORT_ODS.TEST_HTTP_MQ (FFID, ACID, FLNO, HBDATE) VALUES ('114628', 'CA', '3350', TO_DATE('2021-11-01 16:11:28', 'YYYY-MM-DD HH24:MI:SS'));
INSERT INTO HCSS_AIRPORT_ODS.TEST_HTTP_MQ (FFID, ACID, FLNO, HBDATE) VALUES ('114629', 'SC', '8686', TO_DATE('2021-11-01 14:11:47', 'YYYY-MM-DD HH24:MI:SS'));
INSERT INTO HCSS_AIRPORT_ODS.TEST_HTTP_MQ (FFID, ACID, FLNO, HBDATE) VALUES ('114630', '3U', '8633', TO_DATE('2021-10-01 16:12:14', 'YYYY-MM-DD HH24:MI:SS'));

1.2 flow 设计

拓扑接口如下

image-20211101170026160

逻辑为:起一个 HTTP 服务检测一个地址,当有请求时触发 flow,Compute 里去数据库中查询数据并封装成 JSON 一份返回作为 HTTP 的 Response,另一份加入一个标签写入 MQ 做备份,起到日志的作用。

1.2.1 HTTP Input

配置 HTTP 请求地址:/api/getAFDS,最终的 HTTP 请求地址为:http://ip:port/api/getAFDS

image-20211101170204249

1.2.2 Compute

这里封装 SQL,封装查询结果

CREATE COMPUTE MODULE HTTP_Compute
	CREATE FUNCTION Main() RETURNS BOOLEAN
	BEGIN
		-- CALL CopyMessageHeaders();
		-- CALL CopyEntireMessage();
		DECLARE SQL CHARACTER;
		CREATE FIELD OutputLocalEnvironment.TEMP.PMS;
		CREATE FIELD OutputRoot.JSON.Data.data IDENTITY(JSON.Array);
		DECLARE XSJC REFERENCE TO OutputLocalEnvironment.TEMP.PMS;
		DECLARE I INTEGER 1;
		DECLARE SYSDATE CHARACTER;
		-- 获取当前处理时间
		SET SYSDATE = CAST(CURRENT_TIMESTAMP AS CHARACTER FORMAT 'yyyyMMddHHmmss');
		-- 待执行的SQL
		SET SQL = 'SELECT FFID, ACID, FLNO, HBDATE
				   FROM HCSS_AIRPORT_ODS.TEST_HTTP_MQ
				   WHERE TRUNC(HBDATE) = TRUNC(SYSDATE)';
		-- 将 SQL 放入数据库中执行,并获取查询结果
		SET XSJC.Row[I] =PASSTHRU (SQL TO Database.orcl);
		-- 封装 SQL 查询结果
		SET OutputRoot.JSON.Data.Updatetime = SYSDATE;
		SET OutputRoot.JSON.Data.msg = '成功';
		SET OutputRoot.JSON.Data.success = 'true';
		WHILE XSJC.Row[I].FFID IS NOT NULL DO
			SET OutputRoot.JSON.Data.data.Item[I].FFID = XSJC.Row[I].FFID;	
			SET OutputRoot.JSON.Data.data.Item[I].ACID = XSJC.Row[I].ACID;
			SET OutputRoot.JSON.Data.data.Item[I].FLNO = XSJC.Row[I].FLNO;
			SET OutputRoot.JSON.Data.data.Item[I].HBDATE = XSJC.Row[I].HBDATE;		
			SET I = I+1;
		END WHILE;
		RETURN TRUE;
	END;

	CREATE PROCEDURE CopyMessageHeaders() BEGIN
		DECLARE I INTEGER 1;
		DECLARE J INTEGER;
		SET J = CARDINALITY(InputRoot.*[]);
		WHILE I < J DO
			SET OutputRoot.*[I] = InputRoot.*[I];
			SET I = I + 1;
		END WHILE;
	END;

	CREATE PROCEDURE CopyEntireMessage() BEGIN
		SET OutputRoot = InputRoot;
	END;
END MODULE;

1.2.3 Compute1

用来确认报文日志中的报文来源于哪个接口,起到日志作用

CREATE COMPUTE MODULE HTTP_Compute1
	CREATE FUNCTION Main() RETURNS BOOLEAN
	BEGIN
		-- CALL CopyMessageHeaders();
		-- CALL CopyEntireMessage();
		DECLARE STYP CHARACTER;
		SET OutputRoot.JSON.* = InputRoot.JSON.*;
		-- AFDS 为标识符,用来确认报文日志中的报文来源于哪个接口,可自行命名。
		SET STYP= 'AFDS' ; 
		SET OutputRoot.JSON.Data.styp=STYP;
		RETURN TRUE;
	END;

	CREATE PROCEDURE CopyMessageHeaders() BEGIN
		DECLARE I INTEGER 1;
		DECLARE J INTEGER;
		SET J = CARDINALITY(InputRoot.*[]);
		WHILE I < J DO
			SET OutputRoot.*[I] = InputRoot.*[I];
			SET I = I + 1;
		END WHILE;
	END;

	CREATE PROCEDURE CopyEntireMessage() BEGIN
		SET OutputRoot = InputRoot;
	END;
END MODULE;

1.3 测试

部署 flow 即可测试 [OK]

image-20211101171340017

二、HTTP 数据上报

需求:根据数据协议开发数据上报服务

2.1 数据协议

接口说明:xxx 数据上报

请求地址:http://ip:port/scimsservice/FoUnpackResult

请求参数:xml 格式

基础数据项数据名称数据类型说明
消息名称MessageName字符型,最大50个字符FoUnpackResult
消息生成时间MessageTime日期型参见信息定义中的日期说明
行李条码号CheckID字符型,最大50个字符IATA或RFID等行李编号
扫描图像的GuidImageGuid字符型,最大100个字符扫描图像的唯一标识
开包位置CheckPos字符型,最大50个字符比如:A01、YZ01,如无位置信息传送空值
操作员IDOptId字符型,最大50个字符00000001
开包时间UnpackTime日期型参见信息定义中的日期说明
违禁品信息ContrabandInfo----违禁品信息,可没有,可多个
违禁品类型Type字符型,最大50个字符违禁品类型名称
数量Count整型数量
处理方式Handle字符型,最大50个字符处理方式描述
违禁品拍照图像下载地址PhotoURL字符型,最大200个字符违禁品拍照图像URL,可没有,可多个

数据样例:

<Envelope>
    <Header>
        <MessageName>FoUnpackResult</MessageName>
        <MessageTime>2012-02-07T14:30:35+08:00</MessageTime>
    </Header>
    <Body>
        <Record>
            <CheckId>38291821342</CheckId>
            <ImageGuid>ImageGuid1</ImageGuid>
            <CheckPos>开包间</CheckPos>
            <OptId/>
            <UnpackTime>2012-02-07T14:30:35+08:00</UnpackTime>
            <ContrabandInfo>
                <Type>锂电池</Type>
                <Count>1</Count>
                <Handle>旅客自行处理</Handle>
            </ContrabandInfo>
            <ContrabandInfo>
                <Type>危险物品</Type>
                <Count>1</Count>
                <Handle>移交</Handle>
            </ContrabandInfo>
            <PhotoURL>http://192.168.101.1:8000/1.jpg</PhotoURL>
            <PhotoURL>http://192.168.101.1:8000/2.jpg</PhotoURL>
        </Record>
    </Body>
</Envelope>

2.2 数据库准备

根据数据协议创建对应的表存放将来上报的数据

create table SCIMS_FLEXONE_FoUnpackResult
(
    MessageName varchar2(50),
    MessageTime varchar2(50),
    CheckType   varchar2(50),
    CheckID     varchar2(100),
    ImageGuid   varchar2(100),
    CheckPos    varchar2(50),
    OptId       varchar2(50),
    UnpackTime  varchar2(50),
    Type        varchar2(50),
    Count       number,
    Handle      varchar2(50),
    PhotoURL    varchar2(200)
);

2.3 flow 设计

为了提高系统的稳定性,减少因数据解析错误导致数据上报失败,因此将数据上报和解析数据独立出来,数据上报后直接入队列,之后从队列取数据解析入库;这样可以保证数据上报一定是成功的。

2.3.1 上报 flow

3E9956FEC14DB4F936DA8BF3605C3BC5

Computer2

这里不做任何处理,将数据原封不动的写入队列

CREATE COMPUTE MODULE SCIMS_HTTP_Compute
	CREATE FUNCTION Main() RETURNS BOOLEAN
	BEGIN
		-- CALL CopyMessageHeaders();
		-- CALL CopyEntireMessage();
		SET OutputRoot.XMLNSC.* = InputRoot.XMLNSC.*;
		RETURN TRUE;
	END;

	CREATE PROCEDURE CopyMessageHeaders() BEGIN
		DECLARE I INTEGER 1;
		DECLARE J INTEGER;
		SET J = CARDINALITY(InputRoot.*[]);
		WHILE I < J DO
			SET OutputRoot.*[I] = InputRoot.*[I];
			SET I = I + 1;
		END WHILE;
	END;

	CREATE PROCEDURE CopyEntireMessage() BEGIN
		SET OutputRoot = InputRoot;
	END;
END MODULE;

Computer3

封装 Response 给上报方一定的反馈

CREATE COMPUTE MODULE SCIMS_HTTP_Compute1
	CREATE FUNCTION Main() RETURNS BOOLEAN
	BEGIN
		-- CALL CopyMessageHeaders();
		-- CALL CopyEntireMessage();
		SET OutputRoot.JSON.Data.resCode = '0';
		SET OutputRoot.JSON.Data.resMsg = '成功';
		RETURN TRUE;
	END;

	CREATE PROCEDURE CopyMessageHeaders() BEGIN
		DECLARE I INTEGER 1;
		DECLARE J INTEGER;
		SET J = CARDINALITY(InputRoot.*[]);
		WHILE I < J DO
			SET OutputRoot.*[I] = InputRoot.*[I];
			SET I = I + 1;
		END WHILE;
	END;

	CREATE PROCEDURE CopyEntireMessage() BEGIN
		SET OutputRoot = InputRoot;
	END;
END MODULE;

2.3.2 解析 flow

1268C0780010AB06B633D3354565DB11

这里的逻辑就是简单的取数据解析,同时写入另一个队列做一个备份(MQ Output3),解析失败入失败队列(MQ Output2)定期检查数据。

数据解析逻辑如下:

CREATE COMPUTE MODULE SCIMS_EXEC_Compute2
	CREATE FUNCTION Main() RETURNS BOOLEAN
	BEGIN
		-- CALL CopyMessageHeaders();
		-- CALL CopyEntireMessage();
		DECLARE I INTEGER 1;
		insert into Database.orcl.HCSS_AIRPORT_ODS.SCIMS_FLEXONE_FoUnpackResult(
			messagename, 
			messagetime, 
			checktype, 
			checkid, 
			imageguid, 
			checkpos, 
			optid, 
			unpacktime, 
			type, 
			count, 
			handle, 
			photourl
			) values (
			InputRoot.XMLNSC.Envelope.Header.MessageName,
			InputRoot.XMLNSC.Envelope.Header.MessageTime,
			InputRoot.XMLNSC.Envelope.Body.Record.CheckType,
			InputRoot.XMLNSC.Envelope.Body.Record.CheckID,		
			InputRoot.XMLNSC.Envelope.Body.Record.ImageGuid,
			InputRoot.XMLNSC.Envelope.Body.Record.CheckPos,
			InputRoot.XMLNSC.Envelope.Body.Record.OptId,
			InputRoot.XMLNSC.Envelope.Body.Record.UnpackTime,
			InputRoot.XMLNSC.Envelope.Body.Record.ContrabandInfo[I].Type,
			InputRoot.XMLNSC.Envelope.Body.Record.ContrabandInfo[I].Count,
			InputRoot.XMLNSC.Envelope.Body.Record.ContrabandInfo[I].Handle,
			InputRoot.XMLNSC.Envelope.Body.Record.PhotoURL[I]    
			);
			SET I = I + 1;
		WHILE InputRoot.XMLNSC.Envelope.Body.Record.ContrabandInfo[I].Type IS NOT NULL DO
			insert into Database.orcl.HCSS_AIRPORT_ODS.SCIMS_FLEXONE_FoUnpackResult(
				messagename, 
				messagetime, 
				checktype, 
				checkid, 
				imageguid, 
				checkpos, 
				optid, 
				unpacktime, 
				type, 
				count, 
				handle, 
				photourl
			) values (
				InputRoot.XMLNSC.Envelope.Header.MessageName,
				InputRoot.XMLNSC.Envelope.Header.MessageTime,
				InputRoot.XMLNSC.Envelope.Body.Record.CheckType,
				InputRoot.XMLNSC.Envelope.Body.Record.CheckID,
				InputRoot.XMLNSC.Envelope.Body.Record.ImageGuid,
				InputRoot.XMLNSC.Envelope.Body.Record.CheckPos,
				InputRoot.XMLNSC.Envelope.Body.Record.OptId,
				InputRoot.XMLNSC.Envelope.Body.Record.UnpackTime,
				InputRoot.XMLNSC.Envelope.Body.Record.ContrabandInfo[I].Type,
				InputRoot.XMLNSC.Envelope.Body.Record.ContrabandInfo[I].Count,
				InputRoot.XMLNSC.Envelope.Body.Record.ContrabandInfo[I].Handle,
				InputRoot.XMLNSC.Envelope.Body.Record.PhotoURL[I]    
			);
			SET I = I + 1;
		END WHILE;
		RETURN TRUE;
	END;

	CREATE PROCEDURE CopyMessageHeaders() BEGIN
		DECLARE I INTEGER 1;
		DECLARE J INTEGER;
		SET J = CARDINALITY(InputRoot.*[]);
		WHILE I < J DO
			SET OutputRoot.*[I] = InputRoot.*[I];
			SET I = I + 1;
		END WHILE;
	END;

	CREATE PROCEDURE CopyEntireMessage() BEGIN
		SET OutputRoot = InputRoot;
	END;
END MODULE;

2.4 测试

postman 模拟发送一个上报请求,查看数据库有无数据插入即可

image-20211101173538730

1

评论区