5.订阅服务使用说明

GitHub地址:https://github.com/caict-4iot-dev/bif-core-subscribe-tool

5.1 简介

订阅服务提供了星火链网-底层区块链平台区块、交易订阅功能,同时还提供了使用示例说明。中国信通院秉持开源开放的理念,将星火订阅服务面向社区和公众完全开源,助力全行业伙伴提升数据价值流通的效率,实现数据价值转化。

image-20211103160915945

图1 订阅服务逻辑架构图

5.2 方法说明

5.2.1 AddChainResponseMethod

  • 接口说明

该接口用于接收响应消息。
  • 调用方法

BlockChainAdapter AddChainResponseMethod(Overlay.ChainMessageType.CHAIN_HELLO_VALUE,new BlockChainAdapterProc() {public void ChainMethod (byte[] msg, int length) {}});
  • 请求参数

序号

类型

说明

1

Overlay.ChainMessageType.CHAIN_HELLO_VALUE

hello数据类型

2

BlockChainAdapterProc(){}

定义消息处理方法

  • 示例

//接收hello响应消息
BlockChainAdapter.AddChainResponseMethod(Overlay.ChainMessageType.CHAIN_HELLO_VALUE, new BlockChainAdapterProc() {
            public void ChainMethod (byte[] msg, int length) {
                OnChainHello(msg, length);
            }
        });
  • 响应结果

变量

类型

描述

self_addr

string

连接的节点地址

ledger_version

int64

区块版本号

monitor_version

int64

监控程序版本号

chain_version

string

星火链程序版本号

timestamp

int64

时间戳

address_prefix

string

账号前缀

5.2.2 AddChainMethod

  • 接口说明

该接口用于接收请求消息,请求消息是区块链实时推送的消息。
  • 调用方法

BlockChainAdapter AddChainMethod(Overlay.ChainMessageType.CHAIN_TX_ENV_STORE_VALUE,,new BlockChainAdapterProc() {public void ChainMethod (byte[] msg, int length) {}});
  • 请求参数

序号

类型

说明

1

Overlay.ChainMessageType.CHAIN_TX_ENV_STORE_VALUE/CHAIN_LEDGER_TXS_VALUE

交易/区块数据类型

2

BlockChainAdapterProc(){}

定义消息处理方法

  • 示例

 //接收区块链实时推送的消息-交易信息
BlockChainAdapter.AddChainMethod(Overlay.ChainMessageType.CHAIN_TX_ENV_STORE_VALUE, new BlockChainAdapterProc() {
            public void ChainMethod (byte[] msg, int length) {
                OnChainTxEnvStore(msg, length);
            }
        });

 //接收区块链实时推送的消息-区块信息
BlockChainAdapter.AddChainMethod(Overlay.ChainMessageType.CHAIN_LEDGER_TXS_VALUE, new BlockChainAdapterProc() {
            public void ChainMethod (byte[] msg, int length) {
                OnChainLedgerTxs(msg, length);
            }
        });

5.2.3 Send

  • 接口说明

该接口订阅指定账号交易信息。
  • 调用方法

BlockChainAdapter Send(Overlay.ChainMessageType.CHAIN_SUBSCRIBE_TX.getNumber(), tx.build().toByteArray()));
  • 请求参数

序号

类型

说明

1

Overlay.ChainMessageType.CHAIN_SUBSCRIBE_TX.getNumber()

交易/区块数据类型

  • 示例

//订阅指定账号交易信息
String srcAddress="";
Overlay.ChainSubscribeTx.Builder tx=Overlay.ChainSubscribeTx.newBuilder();
  tx.addAddress(srcAddress);
if (!BlockChainAdapter.Send(Overlay.ChainMessageType.CHAIN_SUBSCRIBE_TX.getNumber(),  tx.build().toByteArray())) {
            System.out.println("send tx failed");
  }

5.3 订阅示例

​ 该示例主要实现功能为订阅did:bid:efHmvWpqfVzv5rLNSMrhEdNegLz9AcnS账号交易信息。

  • 示例

    private static chain_test chainTest;
    /**
     * BIF-Core-SDK
     */
    String httpUrl = "http://test-bif-core.xinghuo.space";
    BIFSDK sdk = BIFSDK.getInstance(httpUrl);
    /**
     * 订阅服务
     */
    String webSocketUrl = "ws://test-bif-core.xinghuo.space:7053";
    boolean isConnected = false;
    /**
     * 订阅账号
     */
    String srcAddress = "did:bid:efHmvWpqfVzv5rLNSMrhEdNegLz9AcnS";

    /**
     * 消息适配器
     */
    private BlockChainAdapter chain_message_one_;
    public static void main(String[] argv) {

        chainTest = new chain_test();
        chainTest.Initialize();
        System.out.println("*****************start chain_message successfully******************");
        //chainTest.Stop();
    }
    public void Stop() {
        chain_message_one_.Stop();
    }

    /**
     * 订阅初始化
     */
    public void Initialize() {
        chain_message_one_ = new BlockChainAdapter(webSocketUrl);
        //接收hello响应消息
        chain_message_one_.AddChainResponseMethod(Overlay.ChainMessageType.CHAIN_HELLO_VALUE, new BlockChainAdapterProc() {
            public void ChainMethod (byte[] msg, int length) {
                OnChainHello(msg, length);
            }
        });
        //接收区块链实时推送的消息-交易信息
        chain_message_one_.AddChainMethod(Overlay.ChainMessageType.CHAIN_TX_ENV_STORE_VALUE, new BlockChainAdapterProc() {
            public void ChainMethod (byte[] msg, int length) {
                OnChainTxEnvStore(msg, length);
            }
        });
        //接收区块链实时推送的消息-区块信息
        chain_message_one_.AddChainMethod(Overlay.ChainMessageType.CHAIN_LEDGER_TXS_VALUE, new BlockChainAdapterProc() {
            public void ChainMethod (byte[] msg, int length) {
                OnChainLedgerTxs(msg, length);
            }
        });

        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //发送hello请求
        Overlay.ChainHello.Builder chain_hello = Overlay.ChainHello.newBuilder();
        chain_hello.setTimestamp(System.currentTimeMillis());
        if (!chain_message_one_.Send(Overlay.ChainMessageType.CHAIN_HELLO.getNumber(), chain_hello.build().toByteArray())) {
            System.out.println("send hello failed");
        }
        //订阅指定账号交易信息
        Overlay.ChainSubscribeTx.Builder tx=Overlay.ChainSubscribeTx.newBuilder();
        tx.addAddress(srcAddress);
        if (!chain_message_one_.SendTxSubscribe(Overlay.ChainMessageType.CHAIN_SUBSCRIBE_TX.getNumber(), tx)) {
            System.out.println("send tx failed");
        }
    }
    private void OnChainHello(byte[] msg, int length) {
        try {
            Overlay.ChainStatus chain_status = Overlay.ChainStatus.parseFrom(msg);
            System.out.println(chain_status);
            isConnected = true;
        } catch (Exception e) {
            System.out.println(e.getMessage());
            e.printStackTrace();
        }
    }

    private void OnChainTxEnvStore(byte[] msg, int length) {
        try {
            Chain.TransactionEnvStore  envStore = Chain.TransactionEnvStore.parseFrom(msg);
            String hash = ToHex.bytesToHex(envStore.getHash().toByteArray());
            System.out.println("OnChainTxEnvStore hash:" + hash);
            if (envStore.getErrorCode() == 0) {
                System.out.println(sdk.getUrl());
                BIFTransactionGetInfoRequest request = new BIFTransactionGetInfoRequest();
                request.setHash(ToHex.bytesToHex(envStore.getHash().toByteArray()));
                BIFTransactionGetInfoResponse response = sdk.getBIFTransactionService().getTransactionInfo(request);
                if (response.getErrorCode() == 0) {
                    System.out.println(JsonUtils.toJSONString(response.getResult()));
                } else {
                    System.out.println(JsonUtils.toJSONString(response));
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private void OnChainLedgerTxs(byte[] msg, int length) {
        try {
            Overlay.LedgerTxs envStore = Overlay.LedgerTxs.parseFrom(msg);
            Long ledgerLength = envStore.getHeader().getSeq();
            System.out.println("header size:" + ledgerLength);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

5.4 附录:

5.4.1 交易消息

交易是一段发往区块链系统的请求数据,用于部署合约,调用合约接口,创建账号,进行交易等。当交易确认后会产生交易回执,交易回执交易均保存在区块里,用于记录交易执行过程生成的信息,如结果码、事件、消耗的gas量等。用户可以使用交易哈希查询交易回执,判定交易是否完成。查询接口可参见如何使用星火链工具-SDK使用说明章节。

5.4.2 区块消息

最新的区块高度,可根据区块高度查询该区块已打包交易。查询接口可参见如何使用星火链工具-SDK使用说明章节。