Successfully reported this slideshow.
Your SlideShare is downloading.
Cassandra 2.1 boot camp, Overview
Upcoming SlideShare
Loading in …5
×
No notes for slide
- 1. CASSANDRA SUMMIT SF 2014 CONTRIBUTOR BOOT CAMP Aaron Morton @aaronmorton Co-Founder & Principal Consultant Licensed under a Creative Commons Attribution-NonCommercial 3.0 New Zealand License
- 2. Architecture Startup, Shutdown & Failure StorageProxy MessagingService Gossip
- 3. Dynamo Cluster Architecture Clients API's Dynamo Database Disk API's Dynamo Database Disk Node 1 Node 2
- 4. API Layer o.a.c.auth o.a.c.cql3 o.a.c.metrics o.a.c.thrift o.a.c.transport
- 5. API Layer Talks to Dynamo layer using Commands via the StorageProxy
- 6. Dynamo Layer o.a.c.dht o.a.c.gms o.a.c.locator o.a.c.net o.a.c.repair o.a.c.service o.a.c.streaming
- 7. Dynamo Layer Talks to Database layer by sending messages to IVerbHandler’s via the MessagingService.
- 8. Database Layer o.a.c.cache o.a.c.concurrent o.a.c.db o.a.c.io o.a.c.serializers
- 9. Global Services o.a.c.config o.a.c.trace o.a.c.utils
- 10. Architecture Startup, Shutdown & Failure StorageProxy MessagingService Gossip
- 11. o.a.c.service.CassandraDaemon.main() ! // Singleton // Start MBean setup() // here be magic
- 12. o.a.c.service.CassandraDaemon.setup() // JNA Thread.setDefaultUncaughtExceptionHandler() // Check directories exist SystemKeyspace.checkHealth(); DatabaseDescriptor.loadSchemas(); CFS.disableAutoCompaction(); !
- 13. o.a.c.service.CassandraDaemon.setup() CommitLog.recover(); StorageService.registerDaemon(); StorageService.initServer();
- 14. Exception Hook ! // Exception Metrics ! FileUtils.handleFSError() FileUtils.handleCorruptSSTable()
- 15. Shutdown and Drain Hook ! // Shutdown client transports // Shutdown thread pools // Blocking flush to disk // Shutdown commit log !
- 16. Architecture Startup, Shutdown & Failure StorageProxy MessagingService Gossip
- 17. o.a.c.service.StorageProxy ! // Cluster wide storage operations // Select endpoints & check CL available // Send messages to Stages // Wait for response // Store Hints
- 18. o.a.c.service.IResponseResolver ! preprocess(MessageIn<T> message) resolve() throws DigestMismatchException ! RowDigestResolver RowDataResolver RangeSliceResponseResolver
- 19. Response Handlers / Callback ! implements IAsyncCallback<T> ! response(MessageIn<T> msg) !
- 20. o.a.c.service.ReadCallback.get() ! //Wait for blockfor & data condition.await(timeout, TimeUnit.MILLISECONDS) ! // if condition not set throw ReadTimeoutException() ! resolver.resolve()
- 21. o.a.c.service.StorageProxy.fetchRows() ! AbstractReadExecutor.getReadExecutor() exec.executeAsync(); exec.maybeTryAdditionalReplicas(); --------------------------------------- AbstractReadExecutor.get() //handler.get catch (DigestMismatchException ex) catch (ReadTimeoutException ex)
- 22. AbstractReadExecutor.getReadExecutor() ! StorageProxy.getLiveSortedEndpoints() CFMetaData.newReadRepairDecision() ConsistencyLevel.filterForQuery() ConsistencyLevel.assureSufficientLiveNodes() …
- 23. AbstractReadExecutor.getReadExecutor() ! // no retry or blocking for all replicas return new NeverSpeculatingReadExecutor() ! // always retry or targeting all replicas return new AlwaysSpeculatingReadExecutor() ! // otherwise return new SpeculatingReadExecutor()
- 24. AbstractReadExecutor() ! resolver = new RowDigestResolver() handler = new ReadCallback<>()
- 25. AbstractReadExecutor.executeAsync() // makeDataRequests MessagingService.sendRR(command.createMessage(), endpoint, handler); ! // makeDigestRequests ReadCommand digestCommand = command.copy(); digestCommand.setDigestQuery(true); MessageOut<?> message = digestCommand.createMessage(); MessagingService.instance().sendRR(message, endpoint, handler);
- 26. StorageProxy.mutateAtomically() ! wrapResponseHandler() AbstractWriteResponseHandler.assureSufficientLiveNodes() ! ----------------------------------------------------- getBatchlogEndpoints() syncWriteToBatchlog() // all mutations syncWriteBatchedMutations() // all wrappers asyncRemoveFromBatchlog() ! catch (UnavailableException e) catch (WriteTimeoutException e)
- 27. StorageProxy.wrapResponseHandler() ! StorageService.getNaturalEndpoints() TokenMetadata.pendingEndpointsFor() AbstractReplicationStrategy.getWriteResponseHandler() ----------------------------------------- ! // AbstractWriteResponseHandler WriteResponseHandler DatacenterWriteResponseHandler DatacenterSyncWriteResponseHandler ReplayWriteResponseHandler
- 28. StorageProxy.syncWriteBatchedMutations() ! // write to natural and pending endpoints sendToHintedEndpoints() ! --------------------------------------- ! AbstractWriteResponseHandler.get()
- 29. StorageProxy.sendToHintedEndpoints() // loop all targets MessagingService.sendRR() // for local ! // group messages for remote DC’s dcGroups.get(dc).add(destination) ! // write hints for down nodes submitHint() --------------------------------------- ! sendMessagesToNonlocalDC()
- 30. Architecture Startup, Shutdown & Failure StorageProxy MessagingService Gossip
- 31. MessagingService Transport Layer Custom Serialisation over TCP Sockets. Serialisers spread around code.
- 32. o.a.c.net.MessagingService.verb<<enum>> ! MUTATION READ REQUEST_RESPONSE TREE_REQUEST TREE_RESPONSE (And more...)
- 33. o.a.c.net.MessagingService.verbHandlers ! new EnumMap<Verb, IVerbHandler>(Verb.class)
- 34. o.a.c.net.IVerbHandler<T> ! doVerb(MessageIn<T> message, String id); !
- 35. o.a.c.net.MessageIn<T> public class MessageIn<T> { public final InetAddress from; public final T payload; public final Map<String, byte[]> parameters; public final MessagingService.Verb verb; public final int version; … }
- 36. o.a.c.net.MessageOut<T> public class MessageOut<T> { public final InetAddress public final MessagingService.Verb verb; public final T payload; public final IVersionedSerializer<T> serializer; public final Map<String, byte[]> parameters; … }
- 37. o.a.c.net.MessagingService.verbStages ! new EnumMap<MessagingService.Verb, Stage>(MessagingService.Verb.class)
- 38. o.a.c.net.MessagingService.verbStages ! put(Verb.MUTATION, Stage.MUTATION); put(Verb.READ, Stage.READ); put(Verb.REQUEST_RESPONSE, Stage.REQUEST_RESPONSE);
- 39. o.a.c.net.MessagingService.receive() ! runnable = new MessageDeliveryTask( message, id, timestamp); ! StageManager.getStage( message.getMessageType()); ! stage.execute(runnable);
- 40. o.a.c.net.MessageDeliveryTask.run() ! // If dropable and rpc_timeout MessagingService.incrementDroppedMessages(verb ); ! MessagingService.getVerbHandler(verb) verbHandler.doVerb(message, id)
- 41. Architecture Startup, Shutdown & Failure StorageProxy MessagingService Gossip
- 42. o.a.c.gms.ApplicationState ! STATUS, LOAD, SCHEMA, DC, RACK, RELEASE_VERSION, REMOVAL_COORDINATOR, INTERNAL_IP, RPC_ADDRESS, SEVERITY, NET_VERSION …
- 43. o.a.c.gms.VersionedValue ! public final int version; public final String value;
- 44. o.a.c.gms.VersionGenerator { private static final AtomicInteger version = new AtomicInteger(0); ! public static int getNextVersion() { return version.incrementAndGet(); } }
- 45. o.a.c.gms.EndpointState { private volatile HeartBeatState hbState; final Map<ApplicationState, VersionedValue> applicationState = new NonBlockingHashMap<ApplicationState, VersionedValue>(); ! }
- 46. o.a.c.gms.HeartBeatState { private int generation; private int version; … }
- 47. o.a.c.db.SystemKeyspace.incrementAndGetGeneration() SELECT gossip_generation FROM system.local WHERE key=‘local’; ! // if none generation = (int) (System.currentTimeMillis() / 1000); ! // else generation = (int) (System.currentTimeMillis() / 1000); // and some other checks
- 48. nodetool gossipinfo generation:1410220170 heartbeat:37 LOAD:1.57821104E8 STATUS:NORMAL,-1007384361686170050 RACK:rack1 NET_VERSION:8 SEVERITY:0.0 RELEASE_VERSION:2.1.0-rc5 SCHEMA:f3b70c8e-a904-3de9-ac5d-8ab30271441d HOST_ID:4aac20b5-3c68-4a26-a415-2e2f2ff0ed46 RPC_ADDRESS:127.0.0.1
- 49. o.a.c.gms.Gossiper.GossipTask.run() Gossip every second. 1 to 3 nodes. ! Three step process.
- 50. Processed by IVerbHandlers I Send SYN. Remote replies with ACK. I send ACK2.
- 51. o.a.c.gms.GossipDigestSyn Exchange List<GossipDigest> ! GossipDigest { final InetAddress endpoint; final int generation; final int maxVersion; … }
- 52. o.a.c.gms.Gossiper.examineGossiper() // If empty SYN send all my info (shadow gossip) ! if (remoteGeneration == localGeneration && maxRemoteVersion == maxLocalVersion) // do nothing ! else if (remoteGeneration > localGeneration) // we request everything from the gossiper ! else if (remoteGeneration < localGeneration) // send all data with generation = localgeneration and version >
- 53. o.a.c.gms.Gossiper.examineGossiper() else if (remoteGeneration == localGeneration) ! /* If the max remote version is greater then we request the remote endpoint send us all the data for this endpoint with version greater than the max version number we have locally for this endpoint. ! If the max remote version is lesser, then we send all the data we have locally for this endpoint with version greater than the max remote version. */
- 54. Thanks. !
- 55. Aaron Morton @aaronmorton ! Co-Founder & Principal Consultant www.thelastpickle.com ! Licensed under a Creative Commons Attribution-NonCommercial 3.0 New Zealand License
Cassandra Summit Boot Camp, 2014 Introduction, Aaron Morton presenter
Public clipboards featuring this slide
No public clipboards found for this slide