Add aaa-12 modify the connect driver from layer to RemoteDevice

RCA:
SOL:
修改人:maxiaonan
检视人:
This commit is contained in:
maxiaonan 2019-08-21 17:17:11 +08:00
parent 2b52e9fb44
commit e897f3214d
1 changed files with 212 additions and 51 deletions

View File

@ -23,12 +23,28 @@ import org.opendaylight.yang.gen.v1.http.www.huawei.com.netconf.vrp.huawei.devm.
import org.opendaylight.yang.gen.v1.http.www.huawei.com.netconf.vrp.huawei.devm.rev181123.devm.MemoryInfos;
import org.opendaylight.yang.gen.v1.http.www.huawei.com.netconf.vrp.huawei.devm.rev181123.devm.cpuinfos.CpuInfo;
import org.opendaylight.yang.gen.v1.http.www.huawei.com.netconf.vrp.huawei.devm.rev181123.devm.memoryinfos.MemoryInfo;
import org.opendaylight.yang.gen.v1.http.www.huawei.com.netconf.vrp.huawei.ifm.rev181123.Ifm;
import org.opendaylight.yang.gen.v1.http.www.huawei.com.netconf.vrp.huawei.ifm.rev181123.IfmcommAdminStaType;
import org.opendaylight.yang.gen.v1.http.www.huawei.com.netconf.vrp.huawei.ifm.rev181123.ifm.Interfaces;
import org.opendaylight.yang.gen.v1.http.www.huawei.com.netconf.vrp.huawei.ifm.rev181123.ifm.interfaces.Interface;
import org.opendaylight.yang.gen.v1.http.www.huawei.com.netconf.vrp.huawei.ifm.rev181123.ifm.interfaces._interface.ipv4config.am4cfgaddrs.Am4CfgAddr;
import org.opendaylight.yang.gen.v1.http.www.huawei.com.netconf.vrp.huawei.ifm.rev181123.ifm.interfaces._interface.ipv6config.am6cfgaddrs.Am6CfgAddr;
import org.opendaylight.yang.gen.v1.urn.cmcc.cmhi.adaptation.layer.device.status.rev190809.NodeCpu;
import org.opendaylight.yang.gen.v1.urn.cmcc.cmhi.adaptation.layer.device.status.rev190809.NodeMemory;
import org.opendaylight.yang.gen.v1.urn.cmcc.cmhi.adaptation.layer.device.status.rev190809.cpu.status.CpuInfosBuilder;
import org.opendaylight.yang.gen.v1.urn.cmcc.cmhi.adaptation.layer.device.status.rev190809.mem.status.MemoryInfosBuilder;
import org.opendaylight.yang.gen.v1.urn.cmcc.cmhi.adaptation.layer.tp.extension.rev190809.TpExt;
import org.opendaylight.yang.gen.v1.urn.cmcc.cmhi.adaptation.layer.tp.extension.rev190809.TpExtBuilder;
import org.opendaylight.yang.gen.v1.urn.cmcc.cmhi.adaptation.layer.tp.extension.rev190809.tp.status.TpInfos;
import org.opendaylight.yang.gen.v1.urn.cmcc.cmhi.adaptation.layer.tp.extension.rev190809.tp.status.TpInfosBuilder;
import org.opendaylight.yang.gen.v1.urn.cmcc.cmhi.adaptation.layer.tp.extension.rev190809.tp.status.tp.infos.IpAddress;
import org.opendaylight.yang.gen.v1.urn.cmcc.cmhi.adaptation.layer.tp.extension.rev190809.tp.status.tp.infos.IpAddressBuilder;
import org.opendaylight.yang.gen.v1.urn.cmcc.cmhi.adaptation.layer.tp.extension.rev190809.tp.status.tp.infos.TpAdminStatus;
import org.opendaylight.yang.gen.v1.urn.cmcc.cmhi.adaptation.layer.tp.extension.rev190809.tp.status.tp.infos.TpAdminStatusBuilder;
import org.opendaylight.yang.gen.v1.urn.cmcc.cmhi.huawei.connector.rev190809.ConnectDevice;
import org.opendaylight.yang.gen.v1.urn.cmcc.cmhi.huawei.connector.rev190809.ConnectorInfo;
import org.opendaylight.yang.gen.v1.urn.cmcc.cmhi.huawei.connector.rev190809.connector.info.RemoteDevices;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Host;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.PortNumber;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.*;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.credentials.Credentials;
@ -42,12 +58,16 @@ import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.node.TerminationPoint;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
import java.util.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.*;
import static org.opendaylight.controller.md.sal.binding.api.DataObjectModification.ModificationType.DELETE;
@ -65,38 +85,32 @@ public class HuaweiNetconfSpeaker implements DataTreeChangeListener<ConnectorInf
.create(NetworkTopology.class)
.child(Topology.class,
new TopologyKey(new TopologyId(TopologyNetconf.QNAME.getLocalName())));
private ConcurrentHashMap<String, ScheduledExecutorService> threads = new ConcurrentHashMap<>(8);
private CopyOnWriteArrayList deviceList = new CopyOnWriteArrayList();
/**
* Scheduled Task
* Scheduled Task thread factory
*/
private ThreadFactory timeThreadFactory = new ThreadFactoryBuilder().setNameFormat("huawei-pool-d%").setDaemon(true).build();
/**
* Common Thread Pool
*/
private ScheduledExecutorService pool = new ScheduledThreadPoolExecutor(5, timeThreadFactory,
new ThreadPoolExecutor.AbortPolicy());
public HuaweiNetconfSpeaker(DataBroker dataBroker) {
this.dataBroker = dataBroker;
startTask();
}
public void connectDevice(ConnectDevice connectDevice) {
/**create netconf node with connect device information */
final NetconfNodeBuilder netconfNodeBuilder = new NetconfNodeBuilder();
netconfNodeBuilder.setHost(new Host(new IpAddress(connectDevice.getIp().getIpv4Address())))
netconfNodeBuilder.setHost(new Host(new org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress(connectDevice.getIp().getIpv4Address())))
.setPort(new PortNumber(connectDevice.getPort().getValue()))
.setTcpOnly(true)
.setSchemaless(true);
String username = connectDevice.getUserName();
String password = connectDevice.getPassword();
if (Strings.isNullOrEmpty(username) || Strings.isNullOrEmpty(password)) {
LOG.error( "Empty Username:" + connectDevice.getUserName() + " or Password:" + connectDevice.getPassword()
LOG.error("Empty Username:" + connectDevice.getUserName() + " or Password:" + connectDevice.getPassword()
+ ". In TCP or SSH mode, you must provide valid username and password.");
return ;
return;
}
final Credentials credentials =
new LoginPasswordBuilder().setPassword(password).setUsername(username).build();
@ -109,7 +123,7 @@ public class HuaweiNetconfSpeaker implements DataTreeChangeListener<ConnectorInf
final Node node = new NodeBuilder()
.setKey(new NodeKey(nodeId))
.setNodeId(nodeId)
.addAugmentation(NetconfNode.class,netconfNode)
.addAugmentation(NetconfNode.class, netconfNode)
.build();
final WriteTransaction transaction = dataBroker.newWriteOnlyTransaction();
@ -120,15 +134,18 @@ public class HuaweiNetconfSpeaker implements DataTreeChangeListener<ConnectorInf
@Override
public void onFailure(Throwable throwable) {
LOG.error("Failed to created NetconfNode={}", netconfNode, throwable);
stopTask(connectDevice.getId().getValue());
}
@Override
public void onSuccess(Void avoid) {
LOG.debug("NetconfNode={} created successfully", netconfNode);
startTask(connectDevice.getId().getValue());
}
});
}
public boolean disConnectDevice(String netconfNodeId) {
final WriteTransaction transaction = dataBroker.newWriteOnlyTransaction();
final InstanceIdentifier<Node> iid = NetconfIidFactory.netconfNodeIid(netconfNodeId);
@ -137,6 +154,7 @@ public class HuaweiNetconfSpeaker implements DataTreeChangeListener<ConnectorInf
try {
LOG.debug("Deleting netconf node: {}", netconfNodeId);
transaction.commit().get();
stopTask(netconfNodeId);
return true;
} catch (final InterruptedException | ExecutionException e) {
LOG.error("Unable to remove node with Iid {}", iid, e);
@ -164,8 +182,6 @@ public class HuaweiNetconfSpeaker implements DataTreeChangeListener<ConnectorInf
public void onSessionInitiated(BindingAwareBroker.ProviderContext session) {
LOG.info("HuaweiNetconfSpeaker Session Initiated");
this.mountService = session.getSALService(MountPointService.class);
this.dataBroker = session.getSALService(DataBroker.class);
}
@ -181,10 +197,10 @@ public class HuaweiNetconfSpeaker implements DataTreeChangeListener<ConnectorInf
+ "old Toaster: {}, new Toaster: {}", change.getRootPath().getRootIdentifier(),
oldEntry, newEntry);
ArrayList<ChangeType> changeTypes = checkWriteType(oldEntry, newEntry);
for (ChangeType changeType :changeTypes) {
if(changeType.getType().equals(ChangeType.Type.ADD)){
for (ChangeType changeType : changeTypes) {
if (changeType.getType().equals(ChangeType.Type.ADD)) {
connectDevice(changeType.getDevices());
}else if(changeType.getType().equals(ChangeType.Type.DELETE)){
} else if (changeType.getType().equals(ChangeType.Type.DELETE)) {
disConnectDevice(changeType.getDevices().getId().getValue());
}
}
@ -199,31 +215,31 @@ public class HuaweiNetconfSpeaker implements DataTreeChangeListener<ConnectorInf
private ArrayList<ChangeType> checkWriteType(ConnectorInfo oldEntry, ConnectorInfo newEntry) {
ArrayList<ChangeType> changeTypes = new ArrayList<>();
List<RemoteDevices> oldEntryRemoteDevices;
if(Objects.isNull(oldEntry)){
if (Objects.isNull(oldEntry)) {
oldEntryRemoteDevices = new ArrayList<>();
}else{
} else {
oldEntryRemoteDevices = oldEntry.getRemoteDevices();
}
List<RemoteDevices> newEntryRemoteDevices;
if(Objects.isNull(newEntry)){
if (Objects.isNull(newEntry)) {
newEntryRemoteDevices = new ArrayList<>();
}else{
} else {
newEntryRemoteDevices = newEntry.getRemoteDevices();
}
while (oldEntryRemoteDevices.iterator().hasNext()){
while (oldEntryRemoteDevices.iterator().hasNext()) {
RemoteDevices devices = oldEntryRemoteDevices.iterator().next();
if(newEntryRemoteDevices.contains(devices)){
if (newEntryRemoteDevices.contains(devices)) {
oldEntryRemoteDevices.remove(devices);
newEntryRemoteDevices.remove(devices);
}
}
for(RemoteDevices newRemote :newEntryRemoteDevices){
for (RemoteDevices newRemote : newEntryRemoteDevices) {
ChangeType newChange = new ChangeType();
newChange.setDevices(newRemote);
newChange.setType(ChangeType.Type.ADD);
changeTypes.add(newChange);
}
for(RemoteDevices remoteDevice:oldEntryRemoteDevices){
for (RemoteDevices remoteDevice : oldEntryRemoteDevices) {
ChangeType changeType = new ChangeType();
changeType.setDevices(remoteDevice);
changeType.setType(ChangeType.Type.DELETE);
@ -232,22 +248,38 @@ public class HuaweiNetconfSpeaker implements DataTreeChangeListener<ConnectorInf
return changeTypes;
}
private void startTask() {
pool.scheduleAtFixedRate(()->{
for(Object device :deviceList){
readInfoFromDevice("");
private void startTask(String nodeId) {
ScheduledExecutorService pool = new ScheduledThreadPoolExecutor(2, timeThreadFactory,
new ThreadPoolExecutor.AbortPolicy());
pool.scheduleAtFixedRate(() -> {
readInfoFromDevice(nodeId);
}, 0, 10000, TimeUnit.MILLISECONDS);
if (Objects.nonNull(threads.get(nodeId))) {
if (!threads.get(nodeId).isShutdown()) {
threads.get(nodeId).shutdownNow();
}
},0,10000,TimeUnit.MILLISECONDS);
}
threads.put(nodeId, pool);
}
private void readInfoFromDevice(String s) {
private void stopTask(String nodeId) {
if (threads.containsKey(nodeId)) {
ScheduledExecutorService pool = threads.get(nodeId);
if (Objects.nonNull(pool)) {
pool.shutdown();
}
threads.remove(nodeId);
}
}
private void readInfoFromDevice(String nodeId) {
final Optional<MountPoint> hwNodeOptional = mountService.getMountPoint(NETCONF_TOPO_IID
.child(Node.class, new NodeKey(new NodeId(s))));
.child(Node.class, new NodeKey(new NodeId(nodeId))));
Preconditions.checkArgument(hwNodeOptional.isPresent(),
"Unable to locate mountpoint: %s, not mounted yet or not configured",
s);
nodeId);
final MountPoint hwNode = hwNodeOptional.get();
// Get the DataBroker for the mounted node
@ -270,18 +302,40 @@ public class HuaweiNetconfSpeaker implements DataTreeChangeListener<ConnectorInf
// get/checkedGet makes the call synchronous
cupInfos = hwNodeReadTx.read(LogicalDatastoreType.OPERATIONAL, iid).checkedGet();
} catch (ReadFailedException e) {
throw new IllegalStateException("Unexpected error reading data from " + s, e);
throw new IllegalStateException("Unexpected error reading data from " + nodeId, e);
}
List<CpuInfo> ifcList = new ArrayList<CpuInfo>();
if(cupInfos.isPresent()){
List<org.opendaylight.yang.gen.v1.urn.cmcc.cmhi.adaptation.layer.device.status.rev190809.cpu.status.CpuInfos> ifcList = new ArrayList<>();
if (cupInfos.isPresent()) {
List<CpuInfo> cpuInfoList = cupInfos.get().getCpuInfo();
for(CpuInfo cpuInfo :cpuInfoList){
for (CpuInfo cpuInfo : cpuInfoList) {
LOG.info("Show cpu with serial {},cpu usage is {} ",
cpuInfo.getKey().getPosition(), cpuInfo.getSystemCpuUsage());
CpuInfosBuilder builder = new CpuInfosBuilder();
org.opendaylight.yang.gen.v1.urn.cmcc.cmhi.adaptation.layer.device.status.rev190809.cpu.status.CpuInfos temp = builder.setUsageRate(cpuInfo.getSystemCpuUsage().shortValue()).build();
ifcList.add(temp);
}
}else {
} else {
LOG.info("No data present on path '{}' for mountpoint: {}",
iid, s);
iid, nodeId);
}
//write cpu information to layer
InstanceIdentifier<org.opendaylight.yang.gen.v1.urn.cmcc.cmhi.adaptation.layer.device.status.rev190809.cpu.status.CpuInfos> cpuIid = NETCONF_TOPO_IID.child(Node.class).
augmentation(NodeCpu.class).
child(org.opendaylight.yang.gen.v1.urn.cmcc.cmhi.adaptation.layer.device.status.rev190809.cpu.status.CpuInfos.class);
for (int i = 0; i < ifcList.size(); i++) {
final WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
writeTransaction.put(LogicalDatastoreType.OPERATIONAL, cpuIid, ifcList.get(i));
Futures.addCallback(writeTransaction.submit(), new FutureCallback<Void>() {
@Override
public void onFailure(Throwable throwable) {
LOG.error("Write cpu information failed." + throwable.getMessage());
}
@Override
public void onSuccess(Void avoid) {
LOG.info("Write cpu information success.");
}
});
}
// Identifier path is equivalent to:
@ -294,19 +348,126 @@ public class HuaweiNetconfSpeaker implements DataTreeChangeListener<ConnectorInf
// get/checkedGet makes the call synchronous
memoryInfosOptional = hwNodeReadTx.read(LogicalDatastoreType.OPERATIONAL, memoryIid).checkedGet();
} catch (ReadFailedException e) {
throw new IllegalStateException("Unexpected error reading data from " + s, e);
throw new IllegalStateException("Unexpected error reading data from " + nodeId, e);
}
List<MemoryInfo> memoryList = new ArrayList<MemoryInfo>();
if(memoryInfosOptional.isPresent()){
List<org.opendaylight.yang.gen.v1.urn.cmcc.cmhi.adaptation.layer.device.status.rev190809.mem.status.MemoryInfos> memoryList = new ArrayList<>();
if (memoryInfosOptional.isPresent()) {
List<MemoryInfo> memoryInfoList = memoryInfosOptional.get().getMemoryInfo();
for(MemoryInfo memoryInfo :memoryInfoList){
for (MemoryInfo memoryInfo : memoryInfoList) {
LOG.info("Show memory with serial {},cpu usage is {} ",
memoryInfo.getKey().getPosition(), memoryInfo.getDoMemoryUsage());
MemoryInfosBuilder builder = new MemoryInfosBuilder();
org.opendaylight.yang.gen.v1.urn.cmcc.cmhi.adaptation.layer.device.status.rev190809.mem.status.MemoryInfos temp = builder.setMemoryTotal(memoryInfo.getOsMemoryTotal().shortValue()).
setUsageRate(memoryInfo.getOsMemoryUsage().shortValue()).build();
memoryList.add(temp);
}
}else {
} else {
LOG.info("No data present on path '{}' for mountpoint: {}",
iid, s);
iid, nodeId);
}
//write memory information to layer
InstanceIdentifier<org.opendaylight.yang.gen.v1.urn.cmcc.cmhi.adaptation.layer.device.status.rev190809.mem.status.MemoryInfos> customMemoryIID = NETCONF_TOPO_IID.child(Node.class).
augmentation(NodeMemory.class).
child(org.opendaylight.yang.gen.v1.urn.cmcc.cmhi.adaptation.layer.device.status.rev190809.mem.status.MemoryInfos.class);
for (int i = 0; i < memoryList.size(); i++) {
final WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
writeTransaction.put(LogicalDatastoreType.OPERATIONAL, customMemoryIID, memoryList.get(i));
Futures.addCallback(writeTransaction.submit(), new FutureCallback<Void>() {
@Override
public void onFailure(Throwable throwable) {
LOG.error("Write memory information failed." + throwable.getMessage());
}
@Override
public void onSuccess(Void avoid) {
LOG.info("Write memory information success.");
}
});
}
InstanceIdentifier<Interfaces> interfacesIID =
InstanceIdentifier.create(Ifm.class).child(Interfaces.class);
Optional<Interfaces> interfacesOptional;
try {
// Read from a transaction is asynchronous, but a simple
// get/checkedGet makes the call synchronous
interfacesOptional = hwNodeReadTx.read(LogicalDatastoreType.OPERATIONAL, interfacesIID).checkedGet();
} catch (ReadFailedException e) {
throw new IllegalStateException("Unexpected error reading data from " + nodeId, e);
}
List<TpInfos> tpInfosList = new ArrayList<>();
if (interfacesOptional.isPresent()) {
List<Interface> interfaceList = interfacesOptional.get().getInterface();
for (Interface intf : interfaceList) {
LOG.info("Show memory with serial {},ipv4 config is {} ",
intf.getKey().getIfName(), intf.getIpv4Config());
TpInfosBuilder tpInfosBuilder = new TpInfosBuilder();
List<IpAddress> ipsOnIntf = getiplist(intf);
tpInfosBuilder.setIpAddress(ipsOnIntf);
tpInfosBuilder.setMtu(intf.getIfMtu().shortValue());
tpInfosBuilder.setTpName(intf.getIfName().getValue());
tpInfosBuilder.setTpNumber(intf.getIfNumber());
tpInfosBuilder.setTpAdminStatus(buildTpAdmin(intf.getIfAdminStatus()));
tpInfosBuilder.setTpPhyType(intf.getIfPhyType().getName());
tpInfosList.add(tpInfosBuilder.build());
}
} else {
LOG.info("No data present on path '{}' for mountpoint: {}",
iid, nodeId);
}
//write interface information to layer
InstanceIdentifier<TpInfos> tpinfosIID = NETCONF_TOPO_IID.child(Node.class).
child(TerminationPoint.class).augmentation(TpExt.class).child(TpInfos.class);
for (int i = 0; i < tpInfosList.size(); i++) {
final WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
writeTransaction.put(LogicalDatastoreType.OPERATIONAL, tpinfosIID, tpInfosList.get(i));
Futures.addCallback(writeTransaction.submit(), new FutureCallback<Void>() {
@Override
public void onFailure(Throwable throwable) {
LOG.error("Write tp information information failed." + throwable.getMessage());
}
@Override
public void onSuccess(Void avoid) {
LOG.info("Write tp information information success.");
}
});
}
}
private TpAdminStatus buildTpAdmin(IfmcommAdminStaType ifAdminStatus) {
TpAdminStatusBuilder builder = new TpAdminStatusBuilder();
TpAdminStatus.AdminStatus adminstatus = TpAdminStatus.AdminStatus.forValue(ifAdminStatus.getIntValue());
builder.setAdminStatus(adminstatus);
return builder.build();
}
private List<IpAddress> getiplist(Interface intf) {
List<Am4CfgAddr> list = intf.getIpv4Config().getAm4CfgAddrs().getAm4CfgAddr();
List<Am6CfgAddr> am6CfgAddrList = intf.getIpv6Config().getAm6CfgAddrs().getAm6CfgAddr();
List<IpAddress> addressList = new ArrayList<>();
for (Am4CfgAddr cfg : list) {
IpAddressBuilder builder = new IpAddressBuilder();
org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress ip = new org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress(Ipv4Address.getDefaultInstance(cfg.getIfIpAddr().getValue()));
builder.setIp(ip);
IpAddress tmpAddress = builder.build();
addressList.add(tmpAddress);
}
for (Am6CfgAddr am6CfgAddr : am6CfgAddrList) {
IpAddressBuilder builder = new IpAddressBuilder();
org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress ip = new org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress(Ipv6Address.getDefaultInstance(am6CfgAddr.getIfIp6Addr().getValue()));
builder.setIp(ip);
IpAddress tmpAddress = builder.build();
addressList.add(tmpAddress);
}
return addressList;
}
}