-
-
Save krishna81m/70615951adc437ba2bfbe2397595ab24 to your computer and use it in GitHub Desktop.
JMX Monitoring Demo of lettuce 3.4-SNAPSHOT
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.lambdaworks.redis.experimental.mbean; | |
import java.lang.management.ManagementFactory; | |
import java.util.HashMap; | |
import java.util.Map; | |
import java.util.concurrent.ConcurrentHashMap; | |
import java.util.concurrent.TimeUnit; | |
import javax.management.JMException; | |
import javax.management.MBeanServer; | |
import javax.management.ObjectName; | |
import org.springframework.test.util.ReflectionTestUtils; | |
import rx.functions.Action1; | |
import rx.functions.Func1; | |
import com.google.common.collect.Lists; | |
import com.lambdaworks.redis.ConnectionId; | |
import com.lambdaworks.redis.RedisClient; | |
import com.lambdaworks.redis.RedisConnection; | |
import com.lambdaworks.redis.event.Event; | |
import com.lambdaworks.redis.event.connection.*; | |
import io.netty.channel.EventLoopGroup; | |
/** | |
* Demo for a JMX-based client management. | |
* | |
* @author <a href="mailto:[email protected]">Mark Paluch</a> | |
*/ | |
public class Client implements ClientMBean { | |
private static volatile int counter; | |
private final RedisClient redisClient; | |
private final MBeanServer mBeanServer; | |
private final ObjectName self; | |
private final RedisConnection<String, String> monitoringConnection; | |
private Map<ConnectionId, Connection> connections = new ConcurrentHashMap<ConnectionId, Connection>(); | |
private Client(RedisClient redisClient, final MBeanServer mBeanServer) throws JMException { | |
this.redisClient = redisClient; | |
redisClient.getResources().eventBus().get().filter(new Func1<Event, Boolean>() { | |
@Override | |
public Boolean call(Event event) { | |
return event instanceof ConnectionEvent; | |
} | |
}).cast(ConnectionEvent.class).doOnNext(new Action1<ConnectionEvent>() { | |
@Override | |
public void call(ConnectionEvent connectionEvent) { | |
try { | |
handleEvent(connectionEvent); | |
} catch (JMException e) { | |
throw new IllegalStateException(e); | |
} | |
} | |
}).subscribe(); | |
this.mBeanServer = mBeanServer; | |
StringBuffer id = new StringBuffer(); | |
id.append("name=RedisClient,"); | |
id.append("id=" + (counter++)); | |
this.self = ObjectName.getInstance("redis.client.lettuce:" + id); | |
this.mBeanServer.registerMBean(this, self); | |
this.monitoringConnection = redisClient.connect(); | |
this.redisClient.getResources().eventExecutorGroup().scheduleAtFixedRate(new Runnable() { | |
@Override | |
public void run() { | |
Map<ConnectionId, Connection> connections = new HashMap<ConnectionId, Connection>(Client.this.connections); | |
long timeout = TimeUnit.SECONDS.toMillis(10); | |
long clearDisconnectedConnectionsFromBefore = System.currentTimeMillis() - timeout; | |
for (Map.Entry<ConnectionId, Connection> entry : connections.entrySet()) { | |
Long disconnectedSince = entry.getValue().getDisconnectedSince(); | |
if (disconnectedSince != null && disconnectedSince < clearDisconnectedConnectionsFromBefore) { | |
try { | |
mBeanServer.unregisterMBean(entry.getValue().getObjectName()); | |
} catch (JMException e) { | |
e.printStackTrace(); | |
} | |
Client.this.connections.remove(entry.getKey()); | |
} | |
} | |
} | |
}, 10, 10, TimeUnit.SECONDS); | |
} | |
private void handleEvent(ConnectionEvent connectionEvent) throws JMException { | |
Connection connection = connections.get(connectionEvent); | |
if (connection == null) { | |
connection = new Connection(self, connectionEvent); | |
connections.put(connectionEvent, connection); | |
mBeanServer.registerMBean(connection, connection.getObjectName()); | |
} | |
if (connectionEvent instanceof ConnectedEvent) { | |
connection.updateState("connected"); | |
connection.setDisconnectedSince(null); | |
} | |
if (connectionEvent instanceof ConnectionActivatedEvent) { | |
connection.updateState("activated"); | |
connection.setDisconnectedSince(null); | |
} | |
if (connectionEvent instanceof DisconnectedEvent) { | |
connection.updateState("disconnected"); | |
connection.setDisconnectedSince(System.currentTimeMillis()); | |
} | |
if (connectionEvent instanceof ConnectionDeactivatedEvent) { | |
connection.updateState("deactivated"); | |
connection.setDisconnectedSince(System.currentTimeMillis()); | |
} | |
} | |
public static Client create(RedisClient redisClient) throws JMException { | |
return new Client(redisClient, ManagementFactory.getPlatformMBeanServer()); | |
} | |
@Override | |
public int getConnectionCount() { | |
Object o = ReflectionTestUtils.invokeGetterMethod(redisClient, "getChannelCount"); | |
if (o != null) { | |
return (Integer) o; | |
} | |
return -1; | |
} | |
@Override | |
public int getIoThreadCount() { | |
Map<?, EventLoopGroup> eventLoopGroups = (Map) ReflectionTestUtils.getField(redisClient, "eventLoopGroups"); | |
int result = 0; | |
for (EventLoopGroup eventExecutors : eventLoopGroups.values()) { | |
result += Lists.newArrayList(eventExecutors.iterator()).size(); | |
} | |
return result; | |
} | |
@Override | |
public int getWorkerThreadCount() { | |
return Lists.newArrayList(redisClient.getResources().eventExecutorGroup().iterator()).size(); | |
} | |
@Override | |
public String redisInfo() { | |
return monitoringConnection.info(); | |
} | |
@Override | |
public String clientList() { | |
return monitoringConnection.clientList(); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.lambdaworks.redis.experimental.mbean; | |
public interface ClientMBean { | |
/** | |
* | |
* @return number of connections | |
*/ | |
int getConnectionCount(); | |
/** | |
* | |
* @return number of all I/O threads | |
*/ | |
int getIoThreadCount(); | |
/** | |
* | |
* @return number of computation threads | |
*/ | |
int getWorkerThreadCount(); | |
/** | |
* | |
* @return The result of the Redis {@code INFO} command | |
*/ | |
String redisInfo(); | |
/** | |
* | |
* @return The result of the Redis {@code CLIENT LIST} command | |
*/ | |
String clientList(); | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.lambdaworks.redis.experimental.mbean; | |
import java.net.InetSocketAddress; | |
import java.net.SocketAddress; | |
import javax.management.JMException; | |
import javax.management.ObjectName; | |
import com.lambdaworks.redis.ConnectionId; | |
/** | |
* @author <a href="mailto:[email protected]">Mark Paluch</a> | |
*/ | |
public class Connection implements ConnectionMBean { | |
private final ObjectName objectName; | |
private String state = "unknown"; | |
private final SocketAddress local; | |
private final SocketAddress remote; | |
private Long disconnectedSince = null; | |
public Connection(ObjectName clientObjectName, ConnectionId connectionId) throws JMException { | |
local = connectionId.localAddress(); | |
remote = connectionId.remoteAddress(); | |
StringBuffer id = new StringBuffer(); | |
id.append("name=" + clientObjectName.getKeyProperty("name")); | |
id.append(",id=" + clientObjectName.getKeyProperty("id")); | |
if (remote instanceof InetSocketAddress) { | |
InetSocketAddress isa = (InetSocketAddress) remote; | |
id.append(",protocol=TCP"); | |
id.append(",host=" + isa.getHostName()); | |
id.append(",port=" + "" + isa.getPort()); | |
} else { | |
id.append(",protocol=" + remote.getClass().getSimpleName()); | |
id.append(",remote=" + remote.toString()); | |
} | |
if (local instanceof InetSocketAddress) { | |
InetSocketAddress isa = (InetSocketAddress) local; | |
id.append(",localPort=" + isa.getPort()); | |
} else { | |
id.append(",local=" + remote.toString()); | |
} | |
objectName = ObjectName.getInstance(clientObjectName.getDomain() + ":" + id); | |
} | |
public ObjectName getObjectName() { | |
return objectName; | |
} | |
public Long getDisconnectedSince() { | |
return disconnectedSince; | |
} | |
public void setDisconnectedSince(Long disconnectedSince) { | |
this.disconnectedSince = disconnectedSince; | |
} | |
@Override | |
public String getLocalAddress() { | |
return local.toString(); | |
} | |
@Override | |
public String getRemoteAddress() { | |
return remote.toString(); | |
} | |
@Override | |
public String getState() { | |
return state; | |
} | |
void updateState(String state) { | |
this.state = state; | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.lambdaworks.redis.experimental.mbean; | |
public interface ConnectionMBean { | |
/** | |
* | |
* @return the local address | |
*/ | |
String getLocalAddress(); | |
/** | |
* | |
* @return the remote address | |
*/ | |
String getRemoteAddress(); | |
/** | |
* | |
* @return the connection state | |
*/ | |
String getState(); | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.lambdaworks.redis.experimental.mbean; | |
import org.junit.Test; | |
import com.lambdaworks.redis.RedisClient; | |
import com.lambdaworks.redis.RedisConnection; | |
import com.lambdaworks.redis.RedisURI; | |
import com.lambdaworks.redis.TestSettings; | |
/** | |
* JMX Demo Runner. Registers JMX Monitoring and creates/closes periodically connections. | |
* | |
* @author <a href="mailto:[email protected]">Mark Paluch</a> | |
*/ | |
public class JMXDemo { | |
@Test | |
public void run() throws Exception { | |
RedisClient redisClient = RedisClient.create(RedisURI.Builder.redis(TestSettings.host(), TestSettings.port()).build()); | |
Client.create(redisClient); | |
while (true) { | |
Thread.sleep(5000); | |
System.out.println("Connecting"); | |
RedisConnection<String, String> connect = redisClient.connect(); | |
connect.ping(); | |
Thread.sleep(1000); | |
connect.close(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment