java opc-ua 连接kepserver 读写、订阅操作

2年前 (2022) 程序员胖胖胖虎阿
655 0 0

一.配置服务端

第一步,下载kepserver,这个在网上都能下到,有需求的也可以找我,我这边提供一个无限时长的

第二步,配置kepserver

java opc-ua 连接kepserver 读写、订阅操作

点击 Administration 然后会在电脑右下角的任务栏看到kepserver的图标, 右键点击出来选项之后点击opcua设置进入下面页面

java opc-ua 连接kepserver 读写、订阅操作

双击蓝色区域

java opc-ua 连接kepserver 读写、订阅操作

如下页面可以根据需求自行配置,

然后再次点击右下角图标,点击设置

进入下面页面 

java opc-ua 连接kepserver 读写、订阅操作

右键administrators添加用户,设置用户名密码,点击确认,然后进入kepserver的界面,右键项目-属性,设置ua匿名账户登录

java opc-ua 连接kepserver 读写、订阅操作

本文后续的代码全部使用匿名登录进行,

二.源代码

导入依赖

<!--Milo客户端的依赖-->
<dependency>
    <groupId>org.eclipse.milo</groupId>
    <artifactId>sdk-client</artifactId>
    <version>0.6.3</version>
</dependency>
<!--Milo客户端的依赖-->
<dependency>
    <groupId>org.eclipse.milo</groupId>
    <artifactId>sdk-server</artifactId>
    <version>0.6.3</version>
</dependency>

编写代码

public class Demo {
    private final static String endPointUrl = "opc.tcp://172.16.1.224:49320";
    /**
     *
     *
     * 创建OPC UA客户端
     * @return
     * @throws Exception
     */
    private static OpcUaClient createClient() throws Exception {
        //opc ua服务端地址

        Path securityTempDir = Paths.get(System.getProperty("java.io.tmpdir"), "security");
        Files.createDirectories(securityTempDir);
        if (!Files.exists(securityTempDir)) {
            throw new Exception("unable to create security dir: " + securityTempDir);
        }
        return OpcUaClient.create(endPointUrl,
                endpoints ->
                        endpoints.stream()
                                .filter(e -> e.getSecurityPolicyUri().equals(SecurityPolicy.None.getUri()))
                                .findFirst(),
                configBuilder ->
                        configBuilder
                                .setApplicationName(LocalizedText.english("eclipse milo opc-ua client"))
                                .setApplicationUri("urn:eclipse:milo:examples:client")
                                //访问方式
                                .setIdentityProvider(new AnonymousProvider())
                                .setRequestTimeout(UInteger.valueOf(500))
                                .build()
        );
    }
    /**
     * 遍历树形节点
     *
     * @param client OPC UA客户端
     * @param uaNode 节点
     * @throws Exception
     */
    private static void browseNode(OpcUaClient client, UaNode uaNode) throws Exception {
        List<? extends UaNode> nodes;
        if (uaNode == null) {
            nodes = client.getAddressSpace().browseNodes(Identifiers.ObjectsFolder);
        } else {
            nodes = client.getAddressSpace().browseNodes(uaNode);
        }
        for (UaNode nd : nodes) {
            //排除系统行性节点,这些系统性节点名称一般都是以"_"开头
            if (Objects.requireNonNull(nd.getBrowseName().getName()).contains("_")) {
                continue;
            }
            System.out.println("Node= " + nd.getBrowseName().getName());
            browseNode(client, nd);
        }
    }
    /**
     * 读取节点数据
     *
     * @param client OPC UA客户端
     * @throws Exception
     */
    private static void readNode(OpcUaClient client) throws Exception {
        int namespaceIndex = 2;
        String identifier = "通道 1.设备 1.标记 1";
        //节点
        NodeId nodeId = new NodeId(namespaceIndex, identifier);
        //读取节点数据
        DataValue value = client.readValue(0.0, TimestampsToReturn.Neither, nodeId).get();
        //标识符
        identifier = String.valueOf(nodeId.getIdentifier());
        System.out.println(identifier + ": " + String.valueOf(value.getValue().getValue()));
    }
    /**
     * 写入节点数据
     *
     * @param client
     * @throws Exception
     */
    private static void writeNodeValue(OpcUaClient client) throws Exception {
        //节点
        NodeId nodeId = new NodeId(2, "通道 1.设备 1.标记 4");
        Short i = 3;
        //创建数据对象,此处的数据对象一定要定义类型,不然会出现类型错误,导致无法写入
        DataValue nowValue = new DataValue(new Variant(i), null, null);
        //写入节点数据
        StatusCode statusCode = client.writeValue(nodeId, nowValue).join();
        System.out.println("结果:" + statusCode.isGood());
    }/**
     * 订阅(单个)
     *
     * @param client
     * @throws Exception
     */
    private static void subscribe(OpcUaClient client) throws Exception {
        AtomicInteger a=new AtomicInteger();
        //创建发布间隔1000ms的订阅对象
        client
                .getSubscriptionManager()
                .createSubscription(1000.0)
                .thenAccept(t -> {
                    //节点
                    NodeId nodeId = new NodeId(2, "通道 1.设备 1.标记 4");
                    ReadValueId readValueId = new ReadValueId(nodeId, AttributeId.Value.uid(), null, null);
                    //创建监控的参数
                    MonitoringParameters parameters = new MonitoringParameters(UInteger.valueOf(a.getAndIncrement()), 1000.0, null, UInteger.valueOf(10), true);
                    //创建监控项请求
                    //该请求最后用于创建订阅。
                    MonitoredItemCreateRequest request = new MonitoredItemCreateRequest(readValueId, MonitoringMode.Reporting, parameters);
                    List<MonitoredItemCreateRequest> requests = new ArrayList<>();
                    requests.add(request);
                    //创建监控项,并且注册变量值改变时候的回调函数。
                    t.createMonitoredItems(
                            TimestampsToReturn.Both,
                            requests,
                            (item, id) -> item.setValueConsumer((it, val) -> {
                                System.out.println("nodeid :" + it.getReadValueId().getNodeId());
                                System.out.println("value :" + val.getValue().getValue());
                            })
                    );
                }).get();

        //持续订阅
        Thread.sleep(Long.MAX_VALUE);
    }
    /**
     * 批量订阅
     *
     * @param client
     * @throws Exception
     */
//    private static void managedSubscriptionEvent(OpcUaClient client) throws Exception {
//        final CountDownLatch eventLatch = new CountDownLatch(1);
//
//        //处理订阅业务
//        handlerNode(client);
//
//        //持续监听
//        eventLatch.await();
//    }

    /**
     * 处理订阅业务
     *
     * @param client OPC UA客户端
     */
    private static void handlerNode(OpcUaClient client) {
        try {
            //创建订阅
            ManagedSubscription subscription = ManagedSubscription.create(client);

            //你所需要订阅的key
            List<String> key = new ArrayList<>();
            key.add("通道 1.设备 1.标记 4");
            key.add("通道 1.设备 1.标记 1");

            List<NodeId> nodeIdList = new ArrayList<>();
            for (String s : key) {
                nodeIdList.add(new NodeId(2, s));
            }

            //监听
            List<ManagedDataItem> dataItemList = subscription.createDataItems(nodeIdList);
            for (ManagedDataItem managedDataItem : dataItemList) {
                managedDataItem.addDataValueListener((t) -> {
                    System.out.println(managedDataItem.getNodeId().getIdentifier().toString() + ":" + t.getValue().getValue().toString());
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    /**
     * 自定义订阅监听
     */
    private static class CustomSubscriptionListener implements UaSubscriptionManager.SubscriptionListener {

        private OpcUaClient client;


        CustomSubscriptionListener(OpcUaClient client) {
            this.client = client;
        }

        public void onKeepAlive(UaSubscription subscription, DateTime publishTime) {
            System.out.println("onKeepAlive");
        }

        public void onStatusChanged(UaSubscription subscription, StatusCode status) {
            System.out.println("onStatusChanged");
        }

        public void onPublishFailure(UaException exception) {
            System.out.println("onPublishFailure");
        }

        public void onNotificationDataLost(UaSubscription subscription) {
            System.out.println("onNotificationDataLost");
        }

        /**
         * 重连时 尝试恢复之前的订阅失败时 会调用此方法
         * @param uaSubscription 订阅
         * @param statusCode 状态
         */
        public void onSubscriptionTransferFailed(UaSubscription uaSubscription, StatusCode statusCode) {
            System.out.println("恢复订阅失败 需要重新订阅");
            //在回调方法中重新订阅
            handlerNode(client);
        }
    }
    /**
     * 批量订阅
     *
     * @param client
     * @throws Exception
     */
    private static void managedSubscriptionEvent(OpcUaClient client) throws Exception {
        final CountDownLatch eventLatch = new CountDownLatch(1);

        //添加订阅监听器,用于处理断线重连后的订阅问题
        client.getSubscriptionManager().addSubscriptionListener(new CustomSubscriptionListener(client));

        //处理订阅业务
        handlerNode(client);

        //持续监听
        eventLatch.await();
    }




测试

    public static void main(String[] args) throws Exception {
        OpcUaClient client = createClient();
        client.connect().get();
//        browseNode(client,null);
//        readNode(client);
        writeNodeValue(client);
//        subscribe(client);
//      managedSubscriptionEvent(client);
    }
}.

代码部分可以直接复制进行测试,因为用的匿名登录,所以不需要用户名密码,kepserver设置只需要打开匿名登录就可以了

自动重连测试

java opc-ua 连接kepserver 读写、订阅操作

本文代码部分转载于Java使用Milo实现OPC UA客户端_逛窑子的李靖的博客-CSDN博客_java opc ua

版权声明:程序员胖胖胖虎阿 发表于 2022年9月3日 下午2:24。
转载请注明:java opc-ua 连接kepserver 读写、订阅操作 | 胖虎的工具箱-编程导航

相关文章

暂无评论

暂无评论...