A Cassandra + Solr + Spark Love Triangle Using DataStax Enterprise
Upcoming SlideShare
Loading in …5
×
- 1. We know stuff. And get people excited about it. Rachel Pedreschi @RachelPedreschi Patrick McFadin @PatrickMcFadin A Cassandra + Solr + Spark Love Triangle Using DataStax Enterprise 1
- 2. Not (all queries preplanned. Very Fast) Very (Ask anything, anytime. Slowest) Adhociness C* Search Analytics
- 3. What is Search?
- 4. The bright blue butterfly hangs on the breeze. [the] [bright] [blue] [butterfly] [hangs] [on] [the] [breeze] Tokens
- 5. Credit: https://developer.apple.com/library/mac/documentation/userexperience/conceptual/SearchKitConcepts/searchKit_basics/searchKit_basics.html Terms
- 6. It can be lonely for Solr
- 7. + =
- 8. Cassandra ✓ Highly available ✓ Linear scalability ✓ Low latency OLTP queries C*
- 9. Like… High Availability
- 10. Data Partitioning Application Data Center 1 hash(key) => token(43) 80 10 3050 70 60 40 20
- 11. Replication Data Center 1 hash(key) => token(43) replication factor = 3 80 10 3050 70 60 40 20 Application
- 12. Multi-Data Center Replication Data Center 1 hash(key) => token(43) replication factor = 3 80 10 3050 70 60 40 20 Data Center 2 replication factor = 3 81 11 3151 71 61 41 21 Application
- 13. How does DSE integrate Solr? C* C*/Solr Transactional Search
- 14. SELECT * FROM killrvideo.videos WHERE solr_query='{ "q": "{!edismax qf="name^2 tags^1 description”}datastax" }'; SELECT id, value FROM keyspace.table WHERE token(id) >= -3074457345618258601 AND token(id) <= 3074457345618258603 AND solr_query='id:*'
- 15. Vocab Cassandra term Solr term Column Family /Table Core Row Document Column Field SSTable Index
- 16. … … … … … … … … … … … … … … … … … … … … … … … … … … … … … … … … … … … … … … … … … … … … … … … … … … … … … … … … 18 Node memory Node file system Client partition key1 first:Oscar last:Orange level:42 partition key2 first:Ricky last:Red Memtable (corresponds to a CQL table) Coordinator CommitLog AppendOnly … … … … … … … … … … … … SSTables Flush current state to SSTable Compact related SSTables W rite <3, Betty, Blue, 63> Acknowledge partition key3 first:Betty last:Blue level:63 Compaction Each write request … Periodically … Periodically …
- 17. … … … … … … … … … … … … … … … … … … … … … … … … … … … … … … … … … … … … … … … … … … … … … … … … … … … … … … … … 19 Node memory Node file system Client 1 best 1 2 bright 2,3 Ram Buffer Coordinator … … … … … … … … … … … … Segments Flushes current state to Segment (Softcommit) Write <1,blue, 2,3> 3 blue 2,3 Merge (STW) Each write request … Periodically … On C* Memtable Flush, In memory segments hard commit to disk Shard Router … … … … … … … … … … … … … … … … … … … … … … … … … … … … … … … … … … … …
- 18. … … … … … … … … … … … … … … … … … … … … … … … … … … … … … … … … … … … … 20 Node memory Node file system 1 best 1 2 bright 2,3 Ram Buffer … … … … … … … … … … … … Segments 3 blue 2,3 Not Searchable Searchable Coordinator Shard Router … … … … … … … … … … … … … … … … … … … … … … … … … … … … … … … … … … … …
- 19. And… Scalability
- 20. 80 10 3050 70 60 40 20 Data Center 1 Application
- 21. Data Center 1 80 10 30 50 70 60 40 20 Application
- 22. Data Center 1 80 8 32 56 72 64 48 16 24 4040 24 Application
- 23. Even… Improved Performance
- 24. Standard Solr Indexing DSE Search Live Indexing
- 25. Spark and Cassandra
- 26. Great combo Store a ton of data Analyze a ton of data
- 27. Great combo Spark Streaming Near Real-time SparkSQL Structured Data MLLib Machine Learning GraphX Graph Analysis
- 28. Spark Streaming Near Real-time SparkSQL Structured Data MLLib Machine Learning GraphX Graph Analysis Great combo CREATE TABLE raw_weather_data ( wsid text, year int, month int, day int, hour int, temperature double, dewpoint double, pressure double, wind_direction int, wind_speed double, sky_condition int, sky_condition_text text, one_hour_precip double, six_hour_precip double, PRIMARY KEY ((wsid), year, month, day, hour) ) WITH CLUSTERING ORDER BY (year DESC, month DESC, day DESC, hour DESC); Spark Connector
- 29. How does it work? OSS Stack Executer Master Worker Executer Executer Server
- 30. How does it work? OSS Stack Master Worker 0-24 Token Ranges 0-100 25-49 50-74 75-99 I will only analyze 25% of the data. Worker Worker Worker
- 31. Master 0-24 25-49 50-74 75-99 AnalyticsTransactional Worker WorkerWorker Worker 0-24 75-99 25-49 50-74
- 32. 75-99 SELECT * FROM keyspace.table WHERE token(pk) > 75 AND token(pk) <= 99 Spark RDD Spark Partition Spark Partition Spark Partition Spark Connector Executer Executer Executer Worker Master
- 33. Spark RDD Spark Partition Spark Partition Spark Partition Master Worker Executer Executer Executer 75-99
- 34. Spark Connector Cassandra Cassandra + Spark Joins and Unions No Yes Transformations Limited Yes Outside Data Integration No Yes Aggregations Limited Yes
- 35. Type mapping CQL Type Scala Type ascii String bigint Long boolean Boolean counter Long decimal BigDecimal, java.math.BigDecimal double Double float Float inet java.net.InetAddress int Int list Vector, List, Iterable, Seq, IndexedSeq, java.util.List map Map, TreeMap, java.util.HashMap set Set, TreeSet, java.util.HashSet text, varchar String timestamp Long, java.util.Date, java.sql.Date, org.joda.time.DateTime timeuuid java.util.UUID uuid java.util.UUID varint BigInt, java.math.BigInteger *nullable values Option
- 36. Confidential Let’s go code diving!
- 37. Behind the scenes… // Videos by id CREATE TABLE videos ( videoid uuid, userid uuid, name text, description text, location text, location_type int, preview_image_location text, tags set<text>, added_date timestamp, PRIMARY KEY (videoid) ); // Index for tag keywords CREATE TABLE videos_by_tag ( tag text, videoid uuid, added_date timestamp, userid uuid, name text, preview_image_location text, tagged_date timestamp, PRIMARY KEY (tag, videoid) ); Not a great idea Possible Index
- 38. // Videos by id CREATE TABLE videos ( videoid uuid, userid uuid, name text, description text, location text, location_type int, preview_image_location text, tags set<text>, added_date timestamp, PRIMARY KEY (videoid) And this? This? This?
- 39. 1) Spin up a new C* Cluster with search enabled using the DSE installer. $ sudo service dse cassandra -s 2) Run your schema DDL to create the C* keyspace and tables. 3) Run dse_tool on the videos table $ dsetool create_core killrvideo.videos generateResources=true 4) Use the Solr Admin to check sanity and make sure you have a core. 5) Write a CQL query with a Solr Search in it. SELECT * FROM killrvideo.videos WHERE solr_query='{ "q": "{!edismax qf="name^2 tags^1 description ”}?" }';
- 40. Search all of the things in 5 easy steps…
- 41. Now you get this! SELECT name FROM videos WHERE solr_query = 'tags:crime*';
- 42. Attaching to Spark and Cassandra // Import Cassandra-specific functions on SparkContext and RDD objects import org.apache.spark.{SparkContext, SparkConf} import com.datastax.spark.connector._ /** The setMaster("local") lets us run & test the job right in our IDE */ val conf = new SparkConf(true) .set("spark.cassandra.connection.host", "127.0.0.1") .setMaster(“local[*]") .setAppName(getClass.getName) // Optionally .set("cassandra.username", "cassandra") .set("cassandra.password", “cassandra") val sc = new SparkContext(conf)
- 43. Comment table example CREATE TABLE comments_by_video ( videoid uuid, commentid timeuuid, userid uuid, comment text, PRIMARY KEY (videoid, commentid) ) WITH CLUSTERING ORDER BY (commentid DESC);
- 44. Simple example /** keyspace & table */ val tableRDD = sc.cassandraTable("killrvideo", “comments_by_video”) /** get a simple count of all the rows in the raw_weather_data table */ val rowCount = tableRDD.count() println(s"Total Rows in Comments Table: $rowCount") sc.stop()
- 45. Simple example /** keyspace & table */ val tableRDD = sc.cassandraTable("killrvideo", “comments_by_video”) /** get a simple count of all the rows in the comments_by_video table */ val rowCount = tableRDD.count() println(s"Total Rows in Comments Table: $rowCount") sc.stop() Executer SELECT * FROM killrvideo.comments_by_video Spark RDD Spark Partition Spark Connector
- 46. Using CQL SELECT userid FROM comments_by_video WHERE videoid = '01860584-de45-018f-12be-5f81704e8033' val cqlRRD = sc.cassandraTable("killrvideo", “comments_by_video”) .select("userid") .where("videoid = ?”, “01860584-de45-018f-12be-5f81704e8033")
- 47. Even SQL! spark-sql> SELECT cast(videoid as String) videoid, count(*) c FROM comments_by_video GROUP BY cast(videoid as String) ORDER BY c DESC limit 10;
- 48. Saving back to Cassandra // Create insert data val collection = sc.parallelize(Seq(("01860584-de45-018f-12be-5f81704e8033", "Great video", "cdaf6bd5-8914-29e0- f0b6-8b0bc6156777"), ("01860584-de45-018f-12be-5f81704e8033", "Hated it", "cdaf6bd5-8914-29e0-f0b6-8b0bc6156777"))) // Insert data into table collection.saveToCassandra("killrvideo", "comments_by_video", SomeColumns("videoid", "comment", "userid"))
- 49. val solrQueryRDD = sc.cassandraTable("killrvideo", “videos") .select("name").where("solr_query='tags:crime*'") solrQueryRDD.collect().map(row => println(row.getString("name")))
- 50. C* C*/Solr C*/Spark OLTP Search Analytics
- 51. Confidential Resources www.killrvideo.com https://github.com/LukeTillman/killrvideo-csharp www.datastax.com
- 52. Thank You
Public clipboards featuring this slide
No public clipboards found for this slide