Illustration Image

Cassandra.Link

The best knowledge base on Apache Cassandra®

Helping platform leaders, architects, engineers, and operators build scalable real time data platforms.

7/17/2022

Reading time:24 mins

DataStax Bulk Loader Pt. 1 — Introduction and Loading | Datastax

by John Doe

The DataStax Bulk Loader, dsbulk, is a new bulk loading utility introduced in DSE 6 (To download the DataStax Bulk Loader click here).  It solves the task of efficiently loading data into DataStax Enterprise, as well as efficiently unloading data from DSE and counting the data in DSE, all without having to write any custom code or using other components, such as Apache Spark.  In addition to the bulk load and bulk unload use cases, dsbulk aids in migrating data to a new schema and migrating data from other DSE systems or from other data systems. There is a good high-level blog post that discusses the benefits of dsbulk:Easy to use.Able to support common incoming data formats.Able to export data to common outgoing data formats.Able to support multiple field formats, such as dates and times.Able to support all the DSE data types, including user-defined types.Able to support advanced security configurations.Able to gracefully handle badly parsed data and database insertion errors.Able to report on the status and completion of loading tasks, including summary statistics (such as the load rate).Efficient and fast.Now, I’m a person who learns by example, so what I’m going to do in this series of blog posts is show some of the ways to use dsbulk to do some common tasks.  For the documentation on dsbulk, including all of the parameters and options, see the documentation pages for dsbulk.$ cqlsh -e "SELECT COUNT(*) FROM dsbulkblog.iris_with_id;"count-------150(1 rows)Warnings :Aggregation query used without partition keySetupFor these examples, we are going to use a few tables, so let’s start by creating them:cqlsh -e "CREATE KEYSPACE dsbulkblog WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};"cqlsh -e "CREATE TABLE dsbulkblog.iris_with_id (id int PRIMARY KEY, petal_length double, petal_width double, sepal_length double, sepal_width double, species text);"cqlsh -e "CREATE TABLE dsbulkblog.categories_set(category TEXT PRIMARY KEY, examples SET);"cqlsh -e "CREATE TABLE dsbulkblog.categories_list(category TEXT PRIMARY KEY, examples LIST);"cqlsh -e "CREATE TABLE dsbulkblog.president_birthdates(president TEXT PRIMARY KEY, birthdate_string TEXT, birthdate DATE);"The data files are hosted in a public gist here. You will want to download the individual files to your local machine. For simplicity, let’s say that you download them to /tmp/dsbulkblog. Then you should have these files:$ ls -lh /tmp/dsbulkblog/*-rw-rw-r-- 1 digger digger 5.0K Jun 13 14:55 /tmp/dsbulkblog/iris.csv-rw-rw-r-- 1 digger digger 5.0K Jun 13 14:55 /tmp/dsbulkblog/iris_no_header.csv-rw-rw-r-- 1 digger digger 5.0K Jun 13 14:56 /tmp/dsbulkblog/iris_with_comment.csv-rw-rw-r-- 1 digger digger 3.1K Jun 13 14:56 /tmp/dsbulkblog/iris_with_nulls.csv-rw-rw-r-- 1 digger digger 3.7K Jun 13 14:56 /tmp/dsbulkblog/iris_with_null_string.csv-rw-rw-r-- 1 digger digger 4.5K Jun 13 14:57 /tmp/dsbulkblog/iris_without_id.csv-rw-rw-r-- 1 digger digger 289 Jun 13 14:57 /tmp/dsbulkblog/president_birthdates.psv-rw-rw-r-- 1 digger digger 562 Jun 13 14:57 /tmp/dsbulkblog/sportsteams.csv$ wc /tmp/dsbulkblog/* 151 151 5101 /tmp/dsbulkblog/iris.csv150 150 5040 /tmp/dsbulkblog/iris_no_header.csv152 155 5120 /tmp/dsbulkblog/iris_with_comment.csv151 151 3101 /tmp/dsbulkblog/iris_with_nulls.csv151 151 3701 /tmp/dsbulkblog/iris_with_null_string.csv151 151 4608 /tmp/dsbulkblog/iris_without_id.csv7 26 289 /tmp/dsbulkblog/president_birthdates.psv6 61 562 /tmp/dsbulkblog/sportsteams.csv919 996 27522 totalNow we are ready for the examples!Loading ExamplesExample 1: SimpleThe first example will be the most simple. Let’s load the iris.csv data into the dsbulkblog.iris_with_id table:$ dsbulk load -url /tmp/dsbulkblog/iris.csv -k dsbulkblog -t iris_with_idBefore we look at the output and the results, let’s break apart this command-line:load:  After the dsbulk command, we have load.  There are currently (as of dsbulk 1.2) three modes for dsbulk, load, unload, and count.  As you might have guessed, load is used for loading data from files into DSE, and unload is used for exporting data in DSE to files.  The count mode will count the data in DSE and report various metrics.-url: Next up is the -url parameter.  This is the file name or location of the file (or resource, such as an HTML URL) to be loaded.  It can be a single file, a directory of files, a URL (such as https://gist.github.com/brianmhess/8864cf0cb0ce9ea1fd64e579e9f41100/raw/...), or stdin (which is the default).  Here, we are pointing to the /tmp/dsbulkblog/iris.csv file that we downloaded.-k: Next is the -k parameter, which indicates the keyspace to use.  -t: Then comes the -t parameter, which indicates the table to use.   We set -k to dsbulkblog and -t to iris_with_id, which means we will be loading into the dsbulkblog.iris_with_id table.The output when we run the command is as follows:$ dsbulk load -url /tmp/dsbulkblog/iris.csv -k dsbulkblog -t iris_with_idOperation directory: /tmp/logs/LOAD_20190314-161940-640593total | failed | rows/s | mb/s | kb/row | p50ms | p99ms | p999ms | batches150 | 0 | 329 | 0.02 | 0.05 | 9.84 | 39.58 | 45.88 | 1.00Operation LOAD_20190314-161940-640593 completed successfully in 0 seconds.Last processed positions can be found in positions.txtWe can see that dsbulk has written some information to stderr. Let me call out the table of final stats, which indicate that a total of 150 records, all of which were successful, were written. The column for failed is the number of rows that did not successfully load. The remaining statistics are throughput and latency metrics for the load. We can verify that the database has 150 records with a simple COUNT query (Note: We are doing this count operation via cqlsh for illustrative purposes here. We will discuss dsbulk’s count operation later.):So, there was a lot of other information in those messages, including information about the log directory.  I ran this command in /tmp, and by default dsbulk will create a subdirectory named logs in the local directory in which the command was run, and then create a subdirectory of that logs directory for each run of dsbulk.  The load runs will have a subdirectory that begins LOAD_, while the unload runs will have a subdirectory that begins with UNLOAD_, and the count runs will have a subdirectory that begins with COUNT_.  From the output we see:Operation directory: /tmp/logs/LOAD_20190314-155621-436426.A keen eye will notice that the filename includes the year-month-day-hour-minute-second at the time of when dsbulk was run.  This log directory includes two files: operation.log and positions.txt.  The operations.log file is the main log file and contains the settings used, and the main log of operations.  The positions.txt file is used to pick up a partial load from where the last run left off.  If there were other errors (we’ll see some later), we would see them in other files in this directory.Example 2: Loading from stdindsbulk was designed to work like other command-line tools and load data from stdin and unload data to stdout.  In fact, the default -url is stdin for loading and stdout for unloading.For example:$ cat /tmp/dsbulkblog/iris.csv | sed 's/Iris-//g' | dsbulk load -k dsbulkblog -t iris_with_idWe can check that the data does not have the Iris- prefix with a quick SELECT query:$ cqlsh -e "SELECT * FROM dsbulkblog.iris_with_id LIMIT 3;"id | petal_length | petal_width | sepal_length | sepal_width | species-----+--------------+-------------+--------------+-------------+------------23 | 1.7 | 0.5 | 5.1 | 3.3 | setosa114 | 5.1 | 2.4 | 5.8 | 2.8 | virginica53 | 4 | 1.3 | 5.5 | 2.3 | versicolor(3 rows)We could also add a header to data that has no header. For example, the iris_no_header.csv has not header line, but otherwise is identical to the iris.csv file. We could use awk to prepend a header line, such as:$ cat /tmp/dsbulkblog/iris_no_header.csv | awk 'BEGIN{print "sepal_length,sepal_width,petal_length,petal_width,species,id"} {print $0}' | dsbulk load -k dsbulkblog -t iris_with_idThis is one way.  There are others.We could also add missing columns using command-line tools.  For example, the iris_without_id.csv file is just like iris.csv, but without an id column.  Since id is part of the primary key, we need to create that column.  We could do that again with awk:$ cat /tmp/dsbulkblog/iris_without_id.csv | awk 'BEGIN{id=-1} {if (id == -1){printf("%s,id\n", $0);} else {printf("%s,%d\n", $0, id);} id++;}' | dsbulk load -k dsbulkblog -t iris_with_idOkay, it’s clear that I like awk - and sed, and cut, and … - but you can use whatever command-line tools you want: perl, python, etc.  Or you can write your own application using whatever language you want - Java, C, etc - that writes to stdout.Example 2.2: Loading from a URLdsbulk will also load from a URL, such as the HTTP address for the iris.csv example:$ dsbulk load -k dsbulkblog -t iris_with_id -url https://gist.github.com/brianmhess/8864cf0cb0ce9ea1fd64e579e9f41100/raw/522a1f564a381d7eacf6955c490bb6331d4369b2/iris.csvExample 2.3: Loading a directory of filesdsbulk can also load a directory of files. Let’s create a directory and pop the iris.csv file in it:$ mkdir /tmp/dsbulkblog/iris $ cp /tmp/dsbulkblog/iris.csv /tmp/dsbulkblog/irisNow we can load the files in /tmp/dsbulkblog/iris using:$ dsbulk load -url /tmp/dsbulkblog/iris -k dsbulkblog -t iris_with_idExample 2.4: Loading only some files in a directoryWe can load only some of the files in the directory, too. Let’s put the president_birthdates.psv file in the /tmp/dsbulkblog/iris directory, and then tell dsbulk to only load the .csv files:$ cp /tmp/dsbulkblog/president_birthdates.psv /tmp/dsbulkblog/irisWe will tell dsbulk to only load the *.csv files by specifying --connector.csv.fileNamePattern parameter (**/*.csv is the default for CSV files, but we do it for illustration; see here for more information on the filename patterns.):$ dsbulk load -url /tmp/dsbulkblog/iris -k dsbulkblog -t iris_with_id --connector.csv.fileNamePattern "**/*.csv"Example 3: MappingsNow, how did dsbulk know which input fields mapped to which columns of the dsbulkblog.iris_with_id table? Well, it looked at the first line, or the header line, of the input. The first line of the /tmp/dsbulkblog/iris.csv file is:sepal_length,sepal_width,petal_length,petal_width,species,idIt just so happens that these column names match exactly the column names of dsbulkblog.iris_with_id. dsbulk uses this header line by default to learn how to match up the input fields to the column names. We got lucky that they match.So, what would we do if the column names did not match? Well, we could explicitly list the mapping from the input fields to the column names. There are a few ways to do this. The first would be to list the mapping from what the file has to what the table has. For example:$ dsbulk load -url /tmp/dsbulkblog/iris.csv -k dsbulkblog -t iris_with_id -m "sepal_length = sepal_length, sepal_width = sepal_width, petal_length = petal_length, petal_width = petal_width, species = species, id = id"Now, this example is a little silly because we have the same field names in the header as the column names. To borrow a bit from the example above where we provided the header, let’s provide a different header and then use the mapping:$ cat /tmp/dsbulkblog/iris_no_header.csv | awk 'BEGIN{print"sepal_l,sepal_w,petal_l,petal_w,species_name,id_number"} {print $0}' | dsbulk load -k dsbulkblog -t iris_with_id -m "sepal_l = sepal_length, sepal_w = sepal_width, petal_l = petal_length, petal_w = petal_width, species_name = species, id_number = id"As you can see, the -m takes a list of mappings from input field names, as specified in the header, to column names, as specified in the database schema. So, the input field named sepal_l gets loaded to the column named sepal_length.There is one handy shortcut that you can send in to the -m, namely *=*. This says that all names that match should just be matched (and is the default mapping if none is provided). So, these two are the same:$ dsbulk load -url /tmp/dsbulkblog/iris.csv -k dsbulkblog -t iris_with_id -m "sepal_length = sepal_length, sepal_width = sepal_width, petal_length = petal_length, petal_width = petal_width, species = species, id = id"and$ dsbulk load -url /tmp/dsbulkblog/iris.csv -k dsbulkblog -t iris_with_id -m "*=*"We could also specify that we should load all the columns that match, but exclude a column. For example, if we did not want to load the species we could do:$ dsbulk load -url /tmp/dsbulkblog/iris.csv -k dsbulkblog -t iris_with_id -m "* = -species"And if we didn’t want to load the species or the petal_length we could do:$ dsbulk load -url /tmp/dsbulkblog/iris.csv -k dsbulkblog -t iris_with_id -m "* = [-species, -petal_length]"Example 3.1: Mapping by indexAnother way to specify mappings is by index, instead of name. So, consider the iris_no_header.csv file, which does not have a header line. We could do this by mapping the index of the input to the columns:$ dsbulk load -url /tmp/dsbulkblog/iris_no_header.csv -k dsbulkblog -t iris_with_id -header false -m "0=sepal_length, 1=sepal_width, 2=petal_length, 3=petal_width, 4=species, 5=id"Example 3.2: Mapping as a listWe could also tell dsbulk to ignore the header line, and replace it with a specific list of what header line we would like dsbulk to pretend it read:$ dsbulk load -url /tmp/dsbulkblog/iris.csv -k dsbulkblog -t iris_with_id -m "sepal_length,sepal_width,petal_length,petal_width,species,id"This will map the columns in order. That is, this is saying that the first field maps to the sepal_length column, the second maps to the sepal_width column, etc. If the input file has a header with field names field1, field2, etc, then this is equivalent to:$ dsbulk load -url /tmp/dsbulkblog/iris.csv -k dsbulkblog -t iris_with_id -m "field1=sepal_length,field2=sepal_width,field3=petal_length,field4=petal_width,field5=species,field6=id"Similarly, if the input file has no header, then this is equivalent to:$ dsbulk load -url /tmp/dsbulkblog/iris.csv -k dsbulkblog -t iris_with_id -header false -m "0=sepal_length,1=sepal_width,2=petal_length,3=petal_width,4=species,5=id"Example 3.3: Skipping recordsNotice that we don’t actually set -header to false. If we do that, then we need to skip the first line, otherwise we will try to load that line as data. We can do that with the -skipRecords parameter:$ dsbulk load -url /tmp/dsbulkblog/iris.csv -k dsbulkblog -t iris_with_id -m "sepal_length,sepal_width,petal_length,petal_width,species,id" -header false -skipRecords 1We could skip the header line as well as say the first 100 rows of data, loading only the last 50:$ dsbulk load -url /tmp/dsbulkblog/iris.csv -k dsbulkblog -t iris_with_id -m "sepal_length,sepal_width,petal_length,petal_width,species,id" -header false -skipRecords 101We see that we loaded 50 rows:total | failed | rows/s | mb/s | kb/row | p50ms | p99ms | p999ms | batches   50 |      0 | 122 | 0.01 |   0.05 | 10.13 | 23.33 |  23.33 | 1.00Example 3.4: Loading only some recordsWhile we are on the subject of skipping records, we should also talk about just loading some records. We can limit the load to the first 20 records using the -maxRecords parameter:$ dsbulk load -url /tmp/dsbulkblog/iris.csv -k dsbulkblog -t iris_with_id -maxRecords 20We see that we loaded 20 records:total | failed | rows/s | mb/s | kb/row | p50ms | p99ms | p999ms | batches 20 | 0 | 53 | 0.00 | 0.05 | 5.86 | 12.26 | 12.26 | 1.00We can also combine skipping and limiting:$ dsbulk load -url /tmp/dsbulkblog/iris.csv -k dsbulkblog -t iris_with_id -skipRecords 100 -maxRecords 20Example 3.5: Bad mapping or bad parsingNow, what if we did something wrong and forgot a column in the above example. Let’s say we forgot the id column:$ dsbulk load -url /tmp/dsbulkblog/iris.csv -k dsbulkblog -t iris_with_id -header false -m "sepal_length,sepal_width,petal_length,petal_width,species"Operation directory: /tmp/logs/LOAD_20190314-162512-798132Operation LOAD_20190314-162512-798132 failed: Missing required primary key column id from schema.mapping or schema.query.Last processed positions can be found in positions.txtWe see here that we get an error because id is a primary key column, and dsbulk needs to have all primary key columns specified.However, if we include the columns but get the wrong order, such as swapping the id and species columns, then we will get a different error:$ dsbulk load -url /tmp/dsbulkblog/iris.csv -k dsbulkblog -t iris_with_id -m "0=sepal_length,1=sepal_width,2=petal_length,3=petal_width,4=id,5=species"Operation directory: /tmp/logs/LOAD_20190314-162539-255317Operation LOAD_20190314-162539-255317 aborted: Too many errors, the maximum allowed is 100.total | failed | rows/s | mb/s | kb/row | p50ms | p99ms | p999ms | batches  102 |    101 |  0 | 0.00 |  0.00 | 0.00 |  0.00 | 0.00 |   0.00 Rejected records can be found in the following file(s): mapping.badErrors are detailed in the following file(s): mapping-errors.logLast processed positions can be found in positions.txtHere we see that we got more than 100 errors. But what kind of errors? To figure that out, we need to check out the /tmp/logs/LOAD_20190314-162539-255317 files, specifically, the mapping-errors.log file. The beginning of that file looks like:Resource: file:/tmp/dsbulkblog/iris.csvPosition: 129Source: 6.4,2.8,5.6,2.1,Iris-virginica,128\u000ajava.lang.IllegalArgumentException: Could not parse 'Iris-virginica'; accepted formats are: a valid number (e.g. '1234.56'), a valid Java numeric format (e.g. '-123.45e6'), a valid date-time pattern (e.g. '2019-03-14T16:25:41.267Z'), or a valid boolean word        at com.datastax.dsbulk.engine.internal.codecs.util.CodecUtils.parseNumber(CodecUtils.java:119)        at com.datastax.dsbulk.engine.internal.codecs.string.StringToNumberCodec.parseNumber(StringToNumberCodec.java:72)        at com.datastax.dsbulk.engine.internal.codecs.string.StringToIntegerCodec.externalToInternal(StringToIntegerCodec.java:55)        at com.datastax.dsbulk.engine.internal.codecs.string.StringToIntegerCodec.externalToInternal(StringToIntegerCodec.java:26)        at com.datastax.dsbulk.engine.internal.codecs.ConvertingCodec.serialize(ConvertingCodec.java:50)        Suppressed: java.time.format.DateTimeParseException: Text 'Iris-virginica' could not be parsed at index 0                at java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)                at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1819)                at com.datastax.dsbulk.engine.internal.codecs.util.SimpleTemporalFormat.parse(SimpleTemporalFormat.java:41)                at com.datastax.dsbulk.engine.internal.codecs.util.ZonedTemporalFormat.parse(ZonedTemporalFormat.java:45)                at com.datastax.dsbulk.engine.internal.codecs.util.CodecUtils.parseNumber(CodecUtils.java:106)                Suppressed: java.lang.NumberFormatException: For input string: "Iris-virginica"                        at sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:2043)                        at sun.misc.FloatingDecimal.parseDouble(FloatingDecimal.java:110)                        at java.lang.Double.parseDouble(Double.java:538)                        at java.lang.Double.valueOf(Double.java:502)                        at com.datastax.dsbulk.engine.internal.codecs.util.CodecUtils.parseNumber(CodecUtils.java:101)                        Suppressed: java.lang.NumberFormatException: null                                at java.math.BigDecimal.(BigDecimal.java:494)                                at java.math.BigDecimal.(BigDecimal.java:383)                                at java.math.BigDecimal.(BigDecimal.java:806)                                at com.datastax.dsbulk.engine.internal.codecs.util.CodecUtils.parseNumber(CodecUtils.java:96)                                at com.datastax.dsbulk.engine.internal.codecs.string.StringToNumberCodec.parseNumber(StringToNumberCodec.java:72)                                Suppressed: java.text.ParseException: Invalid number format: Iris-virginica                                        at com.datastax.dsbulk.engine.internal.codecs.util.CodecUtils.parseNumber(CodecUtils.java:247)                                        at com.datastax.dsbulk.engine.internal.codecs.util.CodecUtils.parseNumber(CodecUtils.java:92)                                        at com.datastax.dsbulk.engine.internal.codecs.string.StringToNumberCodec.parseNumber(StringToNumberCodec.java:72)                                        at com.datastax.dsbulk.engine.internal.codecs.string.StringToIntegerCodec.externalToInternal(StringToIntegerCodec.java:55)                                        at com.datastax.dsbulk.engine.internal.codecs.string.StringToIntegerCodec.externalToInternal(StringToIntegerCodec.java:26)We can see that dsbulk is trying to convert Iris-setosa into a number. This is because it believes that this column is the id column, which is an INT. If we look through the file we see the same error over and over again. We’ve clearly messed up the mapping. After 100 errors (that is the default, but it can be overridden by setting -maxErrors) dsbulk quits trying to load.Whenever dsbulk encounters an input line that it cannot parse, it will add that line to the mapping.bad file. This is the input line as it was seen by dsbulk on ingest. If a line or two got garbled or had different format than other lines, dsbulk will populate the mapping.bad file, log the error in mapping-errors.log, but keep going, until it reaches the maximum number of errors. This way, a few bad lines don’t mess up the whole load, and the user can address the few bad lines, either manually inserting them or even running dsbulk on the mapping.bad file with different arguments.For example, to load these bad lines we could load with:$ dsbulk load -url /tmp/logs/LOAD_20190314-162539-255317/mapping.bad -k dsbulkblog -t iris_with_id -header false -m "0=sepal_length,1=sepal_width,2=petal_length,3=petal_width,4=species, 5=id"Operation directory: /tmp/logs/LOAD_20190314-163033-590156 total | failed | rows/s | mb/s | kb/row | p50ms | p99ms | p999ms | batches  101 |      0 | 231 | 0.01 |   0.05 | 8.50 | 17.17 |  18.22 | 1.00Operation LOAD_20190314-163033-590156 completed successfully in 0 seconds.Last processed positions can be found in positions.txtExample 3.6: Missing fields and extra fieldsSometimes we are only loading some of the columns. When the table has more columns than the input, dsbulk will by default throw an error. Now, while it is necessary that all primary key columns be specified, it is allowable to leave other columns undefined or unset.For example, let’s say we didn’t have the sepal_length column defined. We could mimic this with awk, such as:$ cat /tmp/dsbulkblog/iris.csv | awk -F, '{printf("%s,%s,%s,%s,%s\n", $2, $3, $4, $5, $6)}' | dsbulk load -k dsbulkblog -t iris_with_idThis will result in an error because we have not provided the sepal_length column:Operation directory: /tmp/logs/LOAD_20190314-163121-694645At least 1 record does not match the provided schema.mapping or schema.query. Please check that the connector configuration and the schema configuration are correct.Operation LOAD_20190314-163121-694645 aborted: Too many errors, the maximum allowed is 100. total | failed | rows/s | mb/s | kb/row | p50ms | p99ms | p999ms | batches  102 |    101 |  0 | 0.00 |  0.00 | 0.00 |  0.00 | 0.00 |   0.00Rejected records can be found in the following file(s): mapping.badErrors are detailed in the following file(s): mapping-errors.logLast processed positions can be found in positions.txtTo address this, we can add the...--schema.allowMissingFields:$ cat /tmp/dsbulkblog/iris.csv | awk -F, '{printf("%s,%s,%s,%s,%s\n", $2, $3, $4, $5, $6)}' | dsbulk load -k dsbulkblog -t iris_with_id --schema.allowMissingFields trueSimilarly, we could have a situation where the input file has extra columns.$ cat /tmp/dsbulkblog/iris.csv | awk -F, '{printf("%s,%s,%s,%s,%s,%s,extra\n", $1, $2, $3, $4, $5, $6)}' | dsbulk load -k dsbulkblog -t iris_with_idThis will load just fine, and the extra column will be ignored. However, if we wish to be strict about the inputs, we could cause dsbulk to error if there are extra fields using:$ cat /tmp/dsbulkblog/iris.csv | awk -F, '{printf("%s,%s,%s,%s,%s,%s,extra\n", $1, $2, $3, $4, $5, $6)}' | dsbulk load -k dsbulkblog -t iris_with_id --schema.allowExtraFields falseThis will result in an error because we have not said how to map the extra column:Operation directory: /tmp/logs/LOAD_20190314-163305-346514At least 1 record does not match the provided schema.mapping or schema.query. Please check that the connector configuration and the schema configuration are correct.Operation LOAD_20190314-163305-346514 aborted: Too many errors, the maximum allowed is 100. total | failed | rows/s | mb/s | kb/row | p50ms | p99ms | p999ms | batches  102 |    101 |  0 | 0.00 |  0.00 | 0.00 |  0.00 | 0.00 |   0.00Rejected records can be found in the following file(s): mapping.badErrors are detailed in the following file(s): mapping-errors.logLast processed positions can be found in positions.txtExample 3.7: WhitespaceSometimes there is whitespace at the beginning of a text string. We sometimes want this whitespace and sometimes we do not. This is controlled by the --connector.csv.ignoreLeadingWhitespaces parameter, which defaults to false (whitespaces are retained). For example:$ cat /tmp/dsbulkblog/iris.csv | sed "s/Iris/  Iris/g" | dsbulk load -k dsbulkblog -t iris_with_idWe can check that the leading whitespaces are retained via dsbulk’s unload command (which we will discuss in a later blog post):$ dsbulk unload -query "SELECT species FROM dsbulkblog.iris_with_id" | headOperation directory: /tmp/logs/UNLOAD_20190321-144112-777452total | failed | rows/s | mb/s | kb/row | p50ms | p99ms | p999ms150 | 0 | 392 | 0.01 | 0.02 | 9.49 | 41.68 | 41.68Operation UNLOAD_20190321-144112-777452 completed successfully in 0 seconds.speciesIris-setosaIris-setosaIris-virginicaIris-versicolorIris-virginicaIris-versicolorIris-virginicaIris-virginicaIris-virginicaTo strip the leading whitespaces, we can run the load command again with the --connector.csv.ignoreLeadingWhitespaces set to true:$ cat /tmp/dsbulkblog/iris.csv | sed "s/Iris/ Iris/g" | dsbulk load -k dsbulkblog -t iris_with_id --connector.csv.ignoreLeadingWhitespaces trueAgain, we can check this with the dsbulk unload command:$ dsbulk unload -query "SELECT species FROM dsbulkblog.iris_with_id" | headOperation directory: /tmp/logs/UNLOAD_20190321-144510-244786total | failed | rows/s | mb/s | kb/row | p50ms | p99ms | p999ms  150 |      0 | 416 | 0.01 |   0.01 | 8.16 | 36.96 |  36.96Operation UNLOAD_20190321-144510-244786 completed successfully in 0 seconds.speciesIris-setosaIris-setosaIris-virginicaIris-versicolorIris-virginicaIris-versicolorIris-virginicaIris-virginicaIris-virginicaExample 4: Providing the QueryExample 4.1: Providing the query itselfWe can also specify a mapping by providing the actual INSERT statement itself:$ dsbulk load -url /tmp/dsbulkblog/iris.csv -query "INSERT INTO dsbulkblog.iris_with_id(id,petal_width,petal_length,sepal_width,sepal_length,species) VALUES (:id, :petal_width, :petal_length, :sepal_width, :sepal_length, :species)"Notice that we do not need to specify the -k or -t parameters, since it is included in the query itself.Example 4.2: Providing the query with positional mappingWe can also specify a mapping by providing the actual INSERT statement itself:$ dsbulk load -url /tmp/dsbulkblog/iris_no_header.csv -query "INSERT INTO dsbulkblog.iris_with_id(petal_width,petal_length,sepal_width,sepal_length,species,id) VALUES (?,?,?,?,?,?)" -header falseNotice that I needed to move the id column to the end of the list. This is because the order here matters and must match the order in the input data. In our data, the id column is last.Example 4.3: Providing a query with constant valuesWe can also specify a mapping to place constant values into a column. For example, let’s set the species to the same value for all entries:$ dsbulk load -url /tmp/dsbulkblog/iris.csv -query "INSERT INTO dsbulkblog.iris_with_id(id,petal_width,petal_length,sepal_width,sepal_length,species) VALUES (:id, :petal_width, :petal_length, :sepal_width, :sepal_length, 'some kind of iris')"Example 4.4: Deleting data with a queryIt may seem counterintuitive, but we can delete data using the dsbulk load command. Instead of an INSERT statement, we can issue a DELETE statement. For example, let’s delete the rows in our table that correspond to the first 10 lines (11 if you include the header) of the iris.csv file:$ head -11 /tmp/dsbulkblog/iris.csv | dsbulk load -query "DELETE FROM dsbulkblog.iris_with_id WHERE id=:id"Operation directory: /tmp/logs/LOAD_20190320-180959-025572total | failed | rows/s | mb/s | kb/row | p50ms | p99ms | p999ms | batches   10 |      0 | 32 | 0.00 |   0.00 | 4.59 | 5.96 |   5.96 | 1.00Operation LOAD_20190320-180959-025572 completed successfully in 0 seconds.Last processed positions can be found in positions.txtWe can check that those rows have been deleted:$ cqlsh -e "SELECT * FROM dsbulkblog.iris_with_id WHERE id IN (0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15)"id | petal_length | petal_width | sepal_length | sepal_width | species----+--------------+-------------+--------------+-------------+-------------10 |          1.5 | 0.2 |          5.4 | 3.7 | Iris-setosa11 |          1.6 | 0.2 |          4.8 | 3.4 | Iris-setosa12 |          1.4 | 0.1 |          4.8 | 3 | Iris-setosa13 |          1.1 | 0.1 |          4.3 | 3 | Iris-setosa14 |          1.2 | 0.2 |          5.8 | 4 | Iris-setosa15 |          1.5 | 0.4 |          5.7 | 4.4 | Iris-setosa(6 rows)Clearly, we could do all manner of other types of queries here. We could do counter updates, collection updates, and so on.To download the DataStax Bulk Loader click here.To learn additional elements for data loading, read Part 2 of the Bulk Loader series here.

Illustration Image

The DataStax Bulk Loader, dsbulk, is a new bulk loading utility introduced in DSE 6 (To download the DataStax Bulk Loader click here).  It solves the task of efficiently loading data into DataStax Enterprise, as well as efficiently unloading data from DSE and counting the data in DSE, all without having to write any custom code or using other components, such as Apache Spark.  In addition to the bulk load and bulk unload use cases, dsbulk aids in migrating data to a new schema and migrating data from other DSE systems or from other data systems. There is a good high-level blog post that discusses the benefits of dsbulk:

  • Easy to use.
  • Able to support common incoming data formats.
  • Able to export data to common outgoing data formats.
  • Able to support multiple field formats, such as dates and times.
  • Able to support all the DSE data types, including user-defined types.
  • Able to support advanced security configurations.
  • Able to gracefully handle badly parsed data and database insertion errors.
  • Able to report on the status and completion of loading tasks, including summary statistics (such as the load rate).
  • Efficient and fast.

Now, I’m a person who learns by example, so what I’m going to do in this series of blog posts is show some of the ways to use dsbulk to do some common tasks.  For the documentation on dsbulk, including all of the parameters and options, see the documentation pages for dsbulk.

$ cqlsh -e "SELECT COUNT(*) FROM dsbulkblog.iris_with_id;"

count
-------
150

(1 rows)

Warnings :
Aggregation query used without partition key

Setup

For these examples, we are going to use a few tables, so let’s start by creating them:

cqlsh -e "CREATE KEYSPACE dsbulkblog WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};"

cqlsh -e "CREATE TABLE dsbulkblog.iris_with_id (id int PRIMARY KEY, petal_length double, petal_width double, sepal_length double, sepal_width double, species text);"

cqlsh -e "CREATE TABLE dsbulkblog.categories_set(category TEXT PRIMARY KEY, examples SET);"

cqlsh -e "CREATE TABLE dsbulkblog.categories_list(category TEXT PRIMARY KEY, examples LIST);"

cqlsh -e "CREATE TABLE dsbulkblog.president_birthdates(president TEXT PRIMARY KEY, birthdate_string TEXT, birthdate DATE);"

The data files are hosted in a public gist here. You will want to download the individual files to your local machine. For simplicity, let’s say that you download them to /tmp/dsbulkblog. Then you should have these files:

$ ls -lh /tmp/dsbulkblog/*
-rw-rw-r-- 1 digger digger 5.0K Jun 13 14:55 /tmp/dsbulkblog/iris.csv

-rw-rw-r-- 1 digger digger 5.0K Jun 13 14:55 /tmp/dsbulkblog/iris_no_header.csv
-rw-rw-r-- 1 digger digger 5.0K Jun 13 14:56 /tmp/dsbulkblog/iris_with_comment.csv
-rw-rw-r-- 1 digger digger 3.1K Jun 13 14:56 /tmp/dsbulkblog/iris_with_nulls.csv
-rw-rw-r-- 1 digger digger 3.7K Jun 13 14:56 /tmp/dsbulkblog/iris_with_null_string.csv
-rw-rw-r-- 1 digger digger 4.5K Jun 13 14:57 /tmp/dsbulkblog/iris_without_id.csv
-rw-rw-r-- 1 digger digger 289 Jun 13 14:57 /tmp/dsbulkblog/president_birthdates.psv
-rw-rw-r-- 1 digger digger 562 Jun 13 14:57 /tmp/dsbulkblog/sportsteams.csv
$ wc /tmp/dsbulkblog/* 151 151 5101 /tmp/dsbulkblog/iris.csv
150 150 5040 /tmp/dsbulkblog/iris_no_header.csv
152 155 5120 /tmp/dsbulkblog/iris_with_comment.csv
151 151 3101 /tmp/dsbulkblog/iris_with_nulls.csv
151 151 3701 /tmp/dsbulkblog/iris_with_null_string.csv
151 151 4608 /tmp/dsbulkblog/iris_without_id.csv
7 26 289 /tmp/dsbulkblog/president_birthdates.psv
6 61 562 /tmp/dsbulkblog/sportsteams.csv
919 996 27522 total

Now we are ready for the examples!

Loading Examples

Example 1: Simple

The first example will be the most simple. Let’s load the iris.csv data into the dsbulkblog.iris_with_id table:

$ dsbulk load -url /tmp/dsbulkblog/iris.csv -k dsbulkblog -t iris_with_id

Before we look at the output and the results, let’s break apart this command-line:

  • load:  After the dsbulk command, we have load.  There are currently (as of dsbulk 1.2) three modes for dsbulk, load, unload, and count.  As you might have guessed, load is used for loading data from files into DSE, and unload is used for exporting data in DSE to files.  The count mode will count the data in DSE and report various metrics.
  • -url: Next up is the -url parameter.  This is the file name or location of the file (or resource, such as an HTML URL) to be loaded.  It can be a single file, a directory of files, a URL (such as https://gist.github.com/brianmhess/8864cf0cb0ce9ea1fd64e579e9f41100/raw/...), or stdin (which is the default).  Here, we are pointing to the /tmp/dsbulkblog/iris.csv file that we downloaded.
  • -k: Next is the -k parameter, which indicates the keyspace to use.  
  • -t: Then comes the -t parameter, which indicates the table to use.   

We set -k to dsbulkblog and -t to iris_with_id, which means we will be loading into the dsbulkblog.iris_with_id table.

The output when we run the command is as follows:

$ dsbulk load -url /tmp/dsbulkblog/iris.csv -k dsbulkblog -t iris_with_id
Operation directory: /tmp/logs/LOAD_20190314-161940-640593

total | failed | rows/s | mb/s | kb/row | p50ms | p99ms | p999ms | batches

150 | 0 | 329 | 0.02 | 0.05 | 9.84 | 39.58 | 45.88 | 1.00

Operation LOAD_20190314-161940-640593 completed successfully in 0 seconds.
Last processed positions can be found in positions.txt

We can see that dsbulk has written some information to stderr. Let me call out the table of final stats, which indicate that a total of 150 records, all of which were successful, were written. The column for failed is the number of rows that did not successfully load. The remaining statistics are throughput and latency metrics for the load. We can verify that the database has 150 records with a simple COUNT query (Note: We are doing this count operation via cqlsh for illustrative purposes here. We will discuss dsbulk’s count operation later.):

So, there was a lot of other information in those messages, including information about the log directory.  I ran this command in /tmp, and by default dsbulk will create a subdirectory named logs in the local directory in which the command was run, and then create a subdirectory of that logs directory for each run of dsbulk.  The load runs will have a subdirectory that begins LOAD_, while the unload runs will have a subdirectory that begins with UNLOAD_, and the count runs will have a subdirectory that begins with COUNT_.  From the output we see:

Operation directory: /tmp/logs/LOAD_20190314-155621-436426.

A keen eye will notice that the filename includes the year-month-day-hour-minute-second at the time of when dsbulk was run.  This log directory includes two files: operation.log and positions.txt.  The operations.log file is the main log file and contains the settings used, and the main log of operations.  The positions.txt file is used to pick up a partial load from where the last run left off.  If there were other errors (we’ll see some later), we would see them in other files in this directory.

Example 2: Loading from stdin

dsbulk was designed to work like other command-line tools and load data from stdin and unload data to stdout.  In fact, the default -url is stdin for loading and stdout for unloading.

For example:

$ cat /tmp/dsbulkblog/iris.csv | sed 's/Iris-//g' | dsbulk load -k dsbulkblog -t iris_with_id

We can check that the data does not have the Iris- prefix with a quick SELECT query:

$ cqlsh -e "SELECT * FROM dsbulkblog.iris_with_id LIMIT 3;"
id | petal_length | petal_width | sepal_length | sepal_width | species
-----+--------------+-------------+--------------+-------------+------------
23 | 1.7 | 0.5 | 5.1 | 3.3 | setosa
114 | 5.1 | 2.4 | 5.8 | 2.8 | virginica
53 | 4 | 1.3 | 5.5 | 2.3 | versicolor

(3 rows)

We could also add a header to data that has no header. For example, the iris_no_header.csv has not header line, but otherwise is identical to the iris.csv file. We could use awk to prepend a header line, such as:

$ cat /tmp/dsbulkblog/iris_no_header.csv | awk 'BEGIN{print "sepal_length,sepal_width,petal_length,petal_width,species,id"} {print $0}' | dsbulk load -k dsbulkblog -t iris_with_id

This is one way.  There are others.

We could also add missing columns using command-line tools.  For example, the iris_without_id.csv file is just like iris.csv, but without an id column.  Since id is part of the primary key, we need to create that column.  We could do that again with awk:

$ cat /tmp/dsbulkblog/iris_without_id.csv | awk 'BEGIN{id=-1} {if (id == -1)
{printf("%s,id\n", $0);} else {printf("%s,%d\n", $0, id);} id++;}' | dsbulk load -k dsbulkblog -t iris_with_id

Okay, it’s clear that I like awk - and sed, and cut, and … - but you can use whatever command-line tools you want: perl, python, etc.  Or you can write your own application using whatever language you want - Java, C, etc - that writes to stdout.

Example 2.2: Loading from a URL

dsbulk will also load from a URL, such as the HTTP address for the iris.csv example:

$ dsbulk load -k dsbulkblog -t iris_with_id -url https://gist.github.com/brianmhess/8864cf0cb0ce9ea1fd64e579e9f41100/raw/522a1f564a381d7eacf6955c490bb6331d4369b2/iris.csv

Example 2.3: Loading a directory of files

dsbulk can also load a directory of files. Let’s create a directory and pop the iris.csv file in it:

$ mkdir /tmp/dsbulkblog/iris $ cp /tmp/dsbulkblog/iris.csv /tmp/dsbulkblog/iris

Now we can load the files in /tmp/dsbulkblog/iris using:

$ dsbulk load -url /tmp/dsbulkblog/iris -k dsbulkblog -t iris_with_id

Example 2.4: Loading only some files in a directory

We can load only some of the files in the directory, too. Let’s put the president_birthdates.psv file in the /tmp/dsbulkblog/iris directory, and then tell dsbulk to only load the .csv files:

$ cp /tmp/dsbulkblog/president_birthdates.psv /tmp/dsbulkblog/iris

We will tell dsbulk to only load the *.csv files by specifying --connector.csv.fileNamePattern parameter (**/*.csv is the default for CSV files, but we do it for illustration; see here for more information on the filename patterns.):

$ dsbulk load -url /tmp/dsbulkblog/iris -k dsbulkblog -t iris_with_id --connector.csv.fileNamePattern "**/*.csv"

Example 3: Mappings

Now, how did dsbulk know which input fields mapped to which columns of the dsbulkblog.iris_with_id table? Well, it looked at the first line, or the header line, of the input. The first line of the /tmp/dsbulkblog/iris.csv file is:

sepal_length,sepal_width,petal_length,petal_width,species,id

It just so happens that these column names match exactly the column names of dsbulkblog.iris_with_id. dsbulk uses this header line by default to learn how to match up the input fields to the column names. We got lucky that they match.

So, what would we do if the column names did not match? Well, we could explicitly list the mapping from the input fields to the column names. There are a few ways to do this. The first would be to list the mapping from what the file has to what the table has. For example:

$ dsbulk load -url /tmp/dsbulkblog/iris.csv -k dsbulkblog -t iris_with_id -m "sepal_length = sepal_length, sepal_width = sepal_width, petal_length = petal_length, petal_width = petal_width, species = species, id = id"

Now, this example is a little silly because we have the same field names in the header as the column names. To borrow a bit from the example above where we provided the header, let’s provide a different header and then use the mapping:

$ cat /tmp/dsbulkblog/iris_no_header.csv | awk 'BEGIN{print
"sepal_l,sepal_w,petal_l,petal_w,species_name,id_number"} {print $0}' | dsbulk load -k dsbulkblog -t iris_with_id -m "sepal_l = sepal_length, sepal_w = sepal_width, petal_l = petal_length, petal_w = petal_width, species_name = species, id_number = id"

As you can see, the -m takes a list of mappings from input field names, as specified in the header, to column names, as specified in the database schema. So, the input field named sepal_l gets loaded to the column named sepal_length.

There is one handy shortcut that you can send in to the -m, namely *=*. This says that all names that match should just be matched (and is the default mapping if none is provided). So, these two are the same:

$ dsbulk load -url /tmp/dsbulkblog/iris.csv -k dsbulkblog -t iris_with_id -m "sepal_length = sepal_length, sepal_width = sepal_width, petal_length = petal_length, petal_width = petal_width, species = species, id = id"

and

$ dsbulk load -url /tmp/dsbulkblog/iris.csv -k dsbulkblog -t iris_with_id -m "*=*"

We could also specify that we should load all the columns that match, but exclude a column. For example, if we did not want to load the species we could do:

$ dsbulk load -url /tmp/dsbulkblog/iris.csv -k dsbulkblog -t iris_with_id -m "* = -species"

And if we didn’t want to load the species or the petal_length we could do:

$ dsbulk load -url /tmp/dsbulkblog/iris.csv -k dsbulkblog -t iris_with_id -m "* = [-species, -petal_length]"

Example 3.1: Mapping by index

Another way to specify mappings is by index, instead of name. So, consider the iris_no_header.csv file, which does not have a header line. We could do this by mapping the index of the input to the columns:

$ dsbulk load -url /tmp/dsbulkblog/iris_no_header.csv -k dsbulkblog -t iris_with_id -header false -m "0=sepal_length, 1=sepal_width, 2=petal_length, 3=petal_width, 4=species, 5=id"

Example 3.2: Mapping as a list

We could also tell dsbulk to ignore the header line, and replace it with a specific list of what header line we would like dsbulk to pretend it read:

$ dsbulk load -url /tmp/dsbulkblog/iris.csv -k dsbulkblog -t iris_with_id -m "sepal_length,sepal_width,petal_length,petal_width,species,id"

This will map the columns in order. That is, this is saying that the first field maps to the sepal_length column, the second maps to the sepal_width column, etc. If the input file has a header with field names field1, field2, etc, then this is equivalent to:

$ dsbulk load -url /tmp/dsbulkblog/iris.csv -k dsbulkblog -t iris_with_id -m "field1=sepal_length,field2=sepal_width,field3=petal_length,field4=petal_width,field5=species,field6=id"

Similarly, if the input file has no header, then this is equivalent to:

$ dsbulk load -url /tmp/dsbulkblog/iris.csv -k dsbulkblog -t iris_with_id -header false -m "0=sepal_length,1=sepal_width,2=petal_length,3=petal_width,4=species,5=id"

Example 3.3: Skipping records

Notice that we don’t actually set -header to false. If we do that, then we need to skip the first line, otherwise we will try to load that line as data. We can do that with the -skipRecords parameter:

$ dsbulk load -url /tmp/dsbulkblog/iris.csv -k dsbulkblog -t iris_with_id -m "sepal_length,sepal_width,petal_length,petal_width,species,id" -header false -skipRecords 1

We could skip the header line as well as say the first 100 rows of data, loading only the last 50:

$ dsbulk load -url /tmp/dsbulkblog/iris.csv -k dsbulkblog -t iris_with_id -m "sepal_length,sepal_width,petal_length,petal_width,species,id" -header false -skipRecords 101

We see that we loaded 50 rows:

total | failed | rows/s | mb/s | kb/row | p50ms | p99ms | p999ms | batches   

50 |      0 | 122 | 0.01 |   0.05 | 10.13 | 23.33 |  23.33 | 1.00

Example 3.4: Loading only some records

While we are on the subject of skipping records, we should also talk about just loading some records. We can limit the load to the first 20 records using the -maxRecords parameter:

$ dsbulk load -url /tmp/dsbulkblog/iris.csv -k dsbulkblog -t iris_with_id -maxRecords 20

We see that we loaded 20 records:

total | failed | rows/s | mb/s | kb/row | p50ms | p99ms | p999ms | batches 20 | 0 | 53 | 0.00 | 0.05 | 5.86 | 12.26 | 12.26 | 1.00

We can also combine skipping and limiting:

$ dsbulk load -url /tmp/dsbulkblog/iris.csv -k dsbulkblog -t iris_with_id -skipRecords 100 -maxRecords 20

Example 3.5: Bad mapping or bad parsing

Now, what if we did something wrong and forgot a column in the above example. Let’s say we forgot the id column:

$ dsbulk load -url /tmp/dsbulkblog/iris.csv -k dsbulkblog -t iris_with_id -header false -m "sepal_length,sepal_width,petal_length,petal_width,species"
Operation directory: /tmp/logs/LOAD_20190314-162512-798132

Operation LOAD_20190314-162512-798132 failed: Missing required primary key column id from schema.mapping or schema.query.
Last processed positions can be found in positions.txt

We see here that we get an error because id is a primary key column, and dsbulk needs to have all primary key columns specified.

However, if we include the columns but get the wrong order, such as swapping the id and species columns, then we will get a different error:

$ dsbulk load -url /tmp/dsbulkblog/iris.csv -k dsbulkblog -t iris_with_id -m "0=sepal_length,1=sepal_width,2=petal_length,3=petal_width,4=id,5=species"
Operation directory: /tmp/logs/LOAD_20190314-162539-255317

Operation LOAD_20190314-162539-255317 aborted: Too many errors, the maximum allowed is 100.

total | failed | rows/s | mb/s | kb/row | p50ms | p99ms | p999ms | batches  

102 |    101 |  0 | 0.00 |  0.00 | 0.00 |  0.00 | 0.00 |   0.00 Rejected records can be found in the following file(s): mapping.bad

Errors are detailed in the following file(s): mapping-errors.log
Last processed positions can be found in positions.txt

Here we see that we got more than 100 errors. But what kind of errors? To figure that out, we need to check out the /tmp/logs/LOAD_20190314-162539-255317 files, specifically, the mapping-errors.log file. The beginning of that file looks like:

Resource: file:/tmp/dsbulkblog/iris.csv

Position: 129

Source: 6.4,2.8,5.6,2.1,Iris-virginica,128\u000a

java.lang.IllegalArgumentException: Could not parse 'Iris-virginica'; accepted formats are: a valid number (e.g. '1234.56'), a valid Java numeric format (e.g. '-123.45e6'), a valid date-time pattern (e.g. '2019-03-14T16:25:41.267Z'), or a valid boolean word        at com.datastax.dsbulk.engine.internal.codecs.util.CodecUtils.parseNumber(CodecUtils.java:119)        at com.datastax.dsbulk.engine.internal.codecs.string.StringToNumberCodec.parseNumber(StringToNumberCodec.java:72)        at com.datastax.dsbulk.engine.internal.codecs.string.StringToIntegerCodec.externalToInternal(StringToIntegerCodec.java:55)        at com.datastax.dsbulk.engine.internal.codecs.string.StringToIntegerCodec.externalToInternal(StringToIntegerCodec.java:26)        at com.datastax.dsbulk.engine.internal.codecs.ConvertingCodec.serialize(ConvertingCodec.java:50)        Suppressed: java.time.format.DateTimeParseException: Text 'Iris-virginica' could not be parsed at index 0                at java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)                at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1819)                at com.datastax.dsbulk.engine.internal.codecs.util.SimpleTemporalFormat.parse(SimpleTemporalFormat.java:41)                at com.datastax.dsbulk.engine.internal.codecs.util.ZonedTemporalFormat.parse(ZonedTemporalFormat.java:45)                at com.datastax.dsbulk.engine.internal.codecs.util.CodecUtils.parseNumber(CodecUtils.java:106)                Suppressed: java.lang.NumberFormatException: For input string: "Iris-virginica"                        at sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:2043)                        at sun.misc.FloatingDecimal.parseDouble(FloatingDecimal.java:110)                        at java.lang.Double.parseDouble(Double.java:538)                        at java.lang.Double.valueOf(Double.java:502)                        at com.datastax.dsbulk.engine.internal.codecs.util.CodecUtils.parseNumber(CodecUtils.java:101)                        Suppressed: java.lang.NumberFormatException: null                                at java.math.BigDecimal.(BigDecimal.java:494)                                at java.math.BigDecimal.(BigDecimal.java:383)                                at java.math.BigDecimal.(BigDecimal.java:806)                                at com.datastax.dsbulk.engine.internal.codecs.util.CodecUtils.parseNumber(CodecUtils.java:96)                                at com.datastax.dsbulk.engine.internal.codecs.string.StringToNumberCodec.parseNumber(StringToNumberCodec.java:72)                                Suppressed: java.text.ParseException: Invalid number format: Iris-virginica                                        at com.datastax.dsbulk.engine.internal.codecs.util.CodecUtils.parseNumber(CodecUtils.java:247)                                        at com.datastax.dsbulk.engine.internal.codecs.util.CodecUtils.parseNumber(CodecUtils.java:92)                                        at com.datastax.dsbulk.engine.internal.codecs.string.StringToNumberCodec.parseNumber(StringToNumberCodec.java:72)                                        at com.datastax.dsbulk.engine.internal.codecs.string.StringToIntegerCodec.externalToInternal(StringToIntegerCodec.java:55)                                        at com.datastax.dsbulk.engine.internal.codecs.string.StringToIntegerCodec.externalToInternal(StringToIntegerCodec.java:26)

We can see that dsbulk is trying to convert Iris-setosa into a number. This is because it believes that this column is the id column, which is an INT. If we look through the file we see the same error over and over again. We’ve clearly messed up the mapping. After 100 errors (that is the default, but it can be overridden by setting -maxErrors) dsbulk quits trying to load.

Whenever dsbulk encounters an input line that it cannot parse, it will add that line to the mapping.bad file. This is the input line as it was seen by dsbulk on ingest. If a line or two got garbled or had different format than other lines, dsbulk will populate the mapping.bad file, log the error in mapping-errors.log, but keep going, until it reaches the maximum number of errors. This way, a few bad lines don’t mess up the whole load, and the user can address the few bad lines, either manually inserting them or even running dsbulk on the mapping.bad file with different arguments.

For example, to load these bad lines we could load with:

$ dsbulk load -url /tmp/logs/LOAD_20190314-162539-255317/mapping.bad -k dsbulkblog -t iris_with_id -header false -m "0=sepal_length,1=sepal_width,2=petal_length,3=petal_width,4=species, 5=id"

Operation directory: /tmp/logs/LOAD_20190314-163033-590156 total | failed | rows/s | mb/s | kb/row | p50ms | p99ms | p999ms | batches  

101 |      0 | 231 | 0.01 |   0.05 | 8.50 | 17.17 |  18.22 | 1.00

Operation LOAD_20190314-163033-590156 completed successfully in 0 seconds.

Last processed positions can be found in positions.txt

Example 3.6: Missing fields and extra fields

Sometimes we are only loading some of the columns. When the table has more columns than the input, dsbulk will by default throw an error. Now, while it is necessary that all primary key columns be specified, it is allowable to leave other columns undefined or unset.

For example, let’s say we didn’t have the sepal_length column defined. We could mimic this with awk, such as:

$ cat /tmp/dsbulkblog/iris.csv | awk -F, '{printf("%s,%s,%s,%s,%s\n", $2, $3, $4, $5, $6)}' | dsbulk load -k dsbulkblog -t iris_with_id

This will result in an error because we have not provided the sepal_length column:

Operation directory: /tmp/logs/LOAD_20190314-163121-694645

At least 1 record does not match the provided schema.mapping or schema.query. Please check that the connector configuration and the schema configuration are correct.

Operation LOAD_20190314-163121-694645 aborted: Too many errors, the maximum allowed is 100. total | failed | rows/s | mb/s | kb/row | p50ms | p99ms | p999ms | batches  

102 |    101 |  0 | 0.00 |  0.00 | 0.00 |  0.00 | 0.00 |   0.00

Rejected records can be found in the following file(s): mapping.bad

Errors are detailed in the following file(s): mapping-errors.log

Last processed positions can be found in positions.txt

To address this, we can add the...

--schema.allowMissingFields:

$ cat /tmp/dsbulkblog/iris.csv | awk -F, '{printf("%s,%s,%s,%s,%s\n", $2, $3, $4, $5, $6)}' | dsbulk load -k dsbulkblog -t iris_with_id --schema.allowMissingFields true

Similarly, we could have a situation where the input file has extra columns.

$ cat /tmp/dsbulkblog/iris.csv | awk -F, '{printf("%s,%s,%s,%s,%s,%s,extra\n", $1, $2, $3, $4, $5, $6)}' | dsbulk load -k dsbulkblog -t iris_with_id

This will load just fine, and the extra column will be ignored. However, if we wish to be strict about the inputs, we could cause dsbulk to error if there are extra fields using:

$ cat /tmp/dsbulkblog/iris.csv | awk -F, '{printf("%s,%s,%s,%s,%s,%s,extra\n", $1, $2, $3, $4, $5, $6)}' | dsbulk load -k dsbulkblog -t iris_with_id --schema.allowExtraFields false

This will result in an error because we have not said how to map the extra column:

Operation directory: /tmp/logs/LOAD_20190314-163305-346514

At least 1 record does not match the provided schema.mapping or schema.query. Please check that the connector configuration and the schema configuration are correct.

Operation LOAD_20190314-163305-346514 aborted: Too many errors, the maximum allowed is 100. total | failed | rows/s | mb/s | kb/row | p50ms | p99ms | p999ms | batches  

102 |    101 |  0 | 0.00 |  0.00 | 0.00 |  0.00 | 0.00 |   0.00

Rejected records can be found in the following file(s): mapping.bad

Errors are detailed in the following file(s): mapping-errors.log

Last processed positions can be found in positions.txt

Example 3.7: Whitespace

Sometimes there is whitespace at the beginning of a text string. We sometimes want this whitespace and sometimes we do not. This is controlled by the --connector.csv.ignoreLeadingWhitespaces parameter, which defaults to false (whitespaces are retained). For example:

$ cat /tmp/dsbulkblog/iris.csv | sed "s/Iris/  Iris/g" | dsbulk load -k dsbulkblog -t iris_with_id

We can check that the leading whitespaces are retained via dsbulk’s unload command (which we will discuss in a later blog post):

$ dsbulk unload -query "SELECT species FROM dsbulkblog.iris_with_id" | head
Operation directory: /tmp/logs/UNLOAD_20190321-144112-777452

total | failed | rows/s | mb/s | kb/row | p50ms | p99ms | p999ms

150 | 0 | 392 | 0.01 | 0.02 | 9.49 | 41.68 | 41.68

Operation UNLOAD_20190321-144112-777452 completed successfully in 0 seconds.

species

Iris-setosa
Iris-setosa
Iris-virginica
Iris-versicolor
Iris-virginica
Iris-versicolor
Iris-virginica
Iris-virginica
Iris-virginica

To strip the leading whitespaces, we can run the load command again with the --connector.csv.ignoreLeadingWhitespaces set to true:

$ cat /tmp/dsbulkblog/iris.csv | sed "s/Iris/ Iris/g" | dsbulk load -k dsbulkblog -t iris_with_id --connector.csv.ignoreLeadingWhitespaces true

Again, we can check this with the dsbulk unload command:

$ dsbulk unload -query "SELECT species FROM dsbulkblog.iris_with_id" | head
Operation directory: /tmp/logs/UNLOAD_20190321-144510-244786

total | failed | rows/s | mb/s | kb/row | p50ms | p99ms | p999ms  

150 |      0 | 416 | 0.01 |   0.01 | 8.16 | 36.96 |  36.96

Operation UNLOAD_20190321-144510-244786 completed successfully in 0 seconds.

species
Iris-setosa
Iris-setosa
Iris-virginica
Iris-versicolor
Iris-virginica
Iris-versicolor
Iris-virginica
Iris-virginica
Iris-virginica

Example 4: Providing the Query

Example 4.1: Providing the query itself

We can also specify a mapping by providing the actual INSERT statement itself:

$ dsbulk load -url /tmp/dsbulkblog/iris.csv -query "INSERT INTO dsbulkblog.iris_with_id(id,petal_width,petal_length,sepal_width,sepal_length,species) VALUES (:id, :petal_width, :petal_length, :sepal_width, :sepal_length, :species)"

Notice that we do not need to specify the -k or -t parameters, since it is included in the query itself.

Example 4.2: Providing the query with positional mapping

We can also specify a mapping by providing the actual INSERT statement itself:

$ dsbulk load -url /tmp/dsbulkblog/iris_no_header.csv -query "INSERT INTO dsbulkblog.iris_with_id(petal_width,petal_length,sepal_width,sepal_length,species,id) VALUES (?,?,?,?,?,?)" -header false

Notice that I needed to move the id column to the end of the list. This is because the order here matters and must match the order in the input data. In our data, the id column is last.

Example 4.3: Providing a query with constant values

We can also specify a mapping to place constant values into a column. For example, let’s set the species to the same value for all entries:

$ dsbulk load -url /tmp/dsbulkblog/iris.csv -query "INSERT INTO dsbulkblog.iris_with_id(id,petal_width,petal_length,sepal_width,sepal_length,species) VALUES (:id, :petal_width, :petal_length, :sepal_width, :sepal_length, 'some kind of iris')"

Example 4.4: Deleting data with a query

It may seem counterintuitive, but we can delete data using the dsbulk load command. Instead of an INSERT statement, we can issue a DELETE statement. For example, let’s delete the rows in our table that correspond to the first 10 lines (11 if you include the header) of the iris.csv file:

$ head -11 /tmp/dsbulkblog/iris.csv | dsbulk load -query "DELETE FROM dsbulkblog.iris_with_id WHERE id=:id"
Operation directory: /tmp/logs/LOAD_20190320-180959-025572

total | failed | rows/s | mb/s | kb/row | p50ms | p99ms | p999ms | batches   

10 |      0 | 32 | 0.00 |   0.00 | 4.59 | 5.96 |   5.96 | 1.00

Operation LOAD_20190320-180959-025572 completed successfully in 0 seconds.
Last processed positions can be found in positions.txt

We can check that those rows have been deleted:

$ cqlsh -e "SELECT * FROM dsbulkblog.iris_with_id WHERE id IN (0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15)"

id | petal_length | petal_width | sepal_length | sepal_width | species
----+--------------+-------------+--------------+-------------+-------------
10 |          1.5 | 0.2 |          5.4 | 3.7 | Iris-setosa
11 |          1.6 | 0.2 |          4.8 | 3.4 | Iris-setosa
12 |          1.4 | 0.1 |          4.8 | 3 | Iris-setosa
13 |          1.1 | 0.1 |          4.3 | 3 | Iris-setosa
14 |          1.2 | 0.2 |          5.8 | 4 | Iris-setosa
15 |          1.5 | 0.4 |          5.7 | 4.4 | Iris-setosa

(6 rows)

Clearly, we could do all manner of other types of queries here. We could do counter updates, collection updates, and so on.


To download the DataStax Bulk Loader click here.

To learn additional elements for data loading, read Part 2 of the Bulk Loader series here.

dsbulk DataStax Enterprise

Related Articles

node
hybrid.cloud
datastax

GitHub - IBM/datastax-cassandra-clickstream: Use DataStax Enterprise built on Apache Cassandra as a clickstream database

IBM

12/8/2023

Checkout Planet Cassandra

Claim Your Free Planet Cassandra Contributor T-shirt!

Make your contribution and score a FREE Planet Cassandra Contributor T-Shirt! 
We value our incredible Cassandra community, and we want to express our gratitude by sending an exclusive Planet Cassandra Contributor T-Shirt you can wear with pride.

Join Our Newsletter!

Sign up below to receive email updates and see what's going on with our company

Explore Related Topics

AllKafkaSparkScyllaSStableKubernetesApiGithubGraphQl

Explore Further

cassandra