How do you set the row group size of files in hdfs?
I am running some experiments on block size (dfs.block.size) and row group size (parquet.block.size) in hdfs.
I have a large set of data in hdfs, and I want to replicate the data with various block sizes and row group sizes for testing. I'm able to copy the data with a different block size using:
hdfs dfs -D dfs.block.size=67108864 -D parquet.block.size=67108864 -cp /new_sample_parquet /new_sample_parquet_64M
But only the dfs.block.size gets changed. I am verifying with hdfs dfs -stat
for the block size, and parquet-tools meta
for the row group size. In fact, if I replace parquet.block.size
with blah.blah.blah
it has the same effect. I even went into spark-shell and set the parquet.block.size
property manually using
sc.hadoopConfiguration.setInt("parquet.block.size", 67108864).
I am using hadoop 3.1.0. I got the property name of parquet.block.size
from here.
Here is the first 10 rows of the output of my attempt
row group 1: RC:4140100 TS:150147503 OFFSET:4
row group 2: RC:3520100 TS:158294646 OFFSET:59176084
row group 3: RC:880100 TS:80122359 OFFSET:119985867
row group 4: RC:583579 TS:197303521 OFFSET:149394540
row group 5: RC:585594 TS:194850776 OFFSET:213638039
row group 6: RC:2620100 TS:130170698 OFFSET:277223867
row group 7: RC:2750100 TS:136761819 OFFSET:332088066
row group 8: RC:1790100 TS:86766854 OFFSET:389772650
row group 9: RC:2620100 TS:125876377 OFFSET:428147454
row group 10: RC:1700100 TS:83791047 OFFSET:483600973
As you can se, the TS (total size) is way larger than 64MB (67108864 bytes)
My current theory:
I am doing this in spark-shell:
sc.hadoopConfiguration.setInt("parquet.block.size", 67108864)
val a = spark.read.parquet("my_sample_data")
a.rdd.getNumPartitions // 1034
val s = a.coalesce(27)
s.write.format("parquet").mode("Overwrite").options(Map("dfs.block.size" -> "67108864")).save("/my_new_sample_data")
So perhaps it's because my input data already has 1034 partitions. I'm really not sure. My data has about 118 columns per row.
hadoop hdfs parquet parquet-mr
add a comment |
I am running some experiments on block size (dfs.block.size) and row group size (parquet.block.size) in hdfs.
I have a large set of data in hdfs, and I want to replicate the data with various block sizes and row group sizes for testing. I'm able to copy the data with a different block size using:
hdfs dfs -D dfs.block.size=67108864 -D parquet.block.size=67108864 -cp /new_sample_parquet /new_sample_parquet_64M
But only the dfs.block.size gets changed. I am verifying with hdfs dfs -stat
for the block size, and parquet-tools meta
for the row group size. In fact, if I replace parquet.block.size
with blah.blah.blah
it has the same effect. I even went into spark-shell and set the parquet.block.size
property manually using
sc.hadoopConfiguration.setInt("parquet.block.size", 67108864).
I am using hadoop 3.1.0. I got the property name of parquet.block.size
from here.
Here is the first 10 rows of the output of my attempt
row group 1: RC:4140100 TS:150147503 OFFSET:4
row group 2: RC:3520100 TS:158294646 OFFSET:59176084
row group 3: RC:880100 TS:80122359 OFFSET:119985867
row group 4: RC:583579 TS:197303521 OFFSET:149394540
row group 5: RC:585594 TS:194850776 OFFSET:213638039
row group 6: RC:2620100 TS:130170698 OFFSET:277223867
row group 7: RC:2750100 TS:136761819 OFFSET:332088066
row group 8: RC:1790100 TS:86766854 OFFSET:389772650
row group 9: RC:2620100 TS:125876377 OFFSET:428147454
row group 10: RC:1700100 TS:83791047 OFFSET:483600973
As you can se, the TS (total size) is way larger than 64MB (67108864 bytes)
My current theory:
I am doing this in spark-shell:
sc.hadoopConfiguration.setInt("parquet.block.size", 67108864)
val a = spark.read.parquet("my_sample_data")
a.rdd.getNumPartitions // 1034
val s = a.coalesce(27)
s.write.format("parquet").mode("Overwrite").options(Map("dfs.block.size" -> "67108864")).save("/my_new_sample_data")
So perhaps it's because my input data already has 1034 partitions. I'm really not sure. My data has about 118 columns per row.
hadoop hdfs parquet parquet-mr
add a comment |
I am running some experiments on block size (dfs.block.size) and row group size (parquet.block.size) in hdfs.
I have a large set of data in hdfs, and I want to replicate the data with various block sizes and row group sizes for testing. I'm able to copy the data with a different block size using:
hdfs dfs -D dfs.block.size=67108864 -D parquet.block.size=67108864 -cp /new_sample_parquet /new_sample_parquet_64M
But only the dfs.block.size gets changed. I am verifying with hdfs dfs -stat
for the block size, and parquet-tools meta
for the row group size. In fact, if I replace parquet.block.size
with blah.blah.blah
it has the same effect. I even went into spark-shell and set the parquet.block.size
property manually using
sc.hadoopConfiguration.setInt("parquet.block.size", 67108864).
I am using hadoop 3.1.0. I got the property name of parquet.block.size
from here.
Here is the first 10 rows of the output of my attempt
row group 1: RC:4140100 TS:150147503 OFFSET:4
row group 2: RC:3520100 TS:158294646 OFFSET:59176084
row group 3: RC:880100 TS:80122359 OFFSET:119985867
row group 4: RC:583579 TS:197303521 OFFSET:149394540
row group 5: RC:585594 TS:194850776 OFFSET:213638039
row group 6: RC:2620100 TS:130170698 OFFSET:277223867
row group 7: RC:2750100 TS:136761819 OFFSET:332088066
row group 8: RC:1790100 TS:86766854 OFFSET:389772650
row group 9: RC:2620100 TS:125876377 OFFSET:428147454
row group 10: RC:1700100 TS:83791047 OFFSET:483600973
As you can se, the TS (total size) is way larger than 64MB (67108864 bytes)
My current theory:
I am doing this in spark-shell:
sc.hadoopConfiguration.setInt("parquet.block.size", 67108864)
val a = spark.read.parquet("my_sample_data")
a.rdd.getNumPartitions // 1034
val s = a.coalesce(27)
s.write.format("parquet").mode("Overwrite").options(Map("dfs.block.size" -> "67108864")).save("/my_new_sample_data")
So perhaps it's because my input data already has 1034 partitions. I'm really not sure. My data has about 118 columns per row.
hadoop hdfs parquet parquet-mr
I am running some experiments on block size (dfs.block.size) and row group size (parquet.block.size) in hdfs.
I have a large set of data in hdfs, and I want to replicate the data with various block sizes and row group sizes for testing. I'm able to copy the data with a different block size using:
hdfs dfs -D dfs.block.size=67108864 -D parquet.block.size=67108864 -cp /new_sample_parquet /new_sample_parquet_64M
But only the dfs.block.size gets changed. I am verifying with hdfs dfs -stat
for the block size, and parquet-tools meta
for the row group size. In fact, if I replace parquet.block.size
with blah.blah.blah
it has the same effect. I even went into spark-shell and set the parquet.block.size
property manually using
sc.hadoopConfiguration.setInt("parquet.block.size", 67108864).
I am using hadoop 3.1.0. I got the property name of parquet.block.size
from here.
Here is the first 10 rows of the output of my attempt
row group 1: RC:4140100 TS:150147503 OFFSET:4
row group 2: RC:3520100 TS:158294646 OFFSET:59176084
row group 3: RC:880100 TS:80122359 OFFSET:119985867
row group 4: RC:583579 TS:197303521 OFFSET:149394540
row group 5: RC:585594 TS:194850776 OFFSET:213638039
row group 6: RC:2620100 TS:130170698 OFFSET:277223867
row group 7: RC:2750100 TS:136761819 OFFSET:332088066
row group 8: RC:1790100 TS:86766854 OFFSET:389772650
row group 9: RC:2620100 TS:125876377 OFFSET:428147454
row group 10: RC:1700100 TS:83791047 OFFSET:483600973
As you can se, the TS (total size) is way larger than 64MB (67108864 bytes)
My current theory:
I am doing this in spark-shell:
sc.hadoopConfiguration.setInt("parquet.block.size", 67108864)
val a = spark.read.parquet("my_sample_data")
a.rdd.getNumPartitions // 1034
val s = a.coalesce(27)
s.write.format("parquet").mode("Overwrite").options(Map("dfs.block.size" -> "67108864")).save("/my_new_sample_data")
So perhaps it's because my input data already has 1034 partitions. I'm really not sure. My data has about 118 columns per row.
hadoop hdfs parquet parquet-mr
hadoop hdfs parquet parquet-mr
edited Nov 20 at 19:37
asked Nov 15 at 17:43
user3685285
1,54731737
1,54731737
add a comment |
add a comment |
1 Answer
1
active
oldest
votes
The parquet.block.size
property only affects Parquet writers. The hdfs dfs -cp
command copies files regardless of their contents on the other hand. The parquet.block.size
property is therefore ignored by hdfs dfs -cp
.
Imagine that you have an application that takes screenshots in either JPG or PNG format, depending on a config file. You make a copy of those screenshots with the cp
command. Naturally, even if you change the desired image format in the config file, the cp
command will always create output files in the image format of the original files, regardless of the config file. The config file is only used by the screenshot application and not by cp
. This is how the parquet.block.size
property works as well.
What you can do to change the block size is to rewrite the file. You mentioned that you have spark-shell
. Use that to rewrite the Parquet file by issuing
sc.hadoopConfiguration.setInt("parquet.block.size", 67108864)
var df = spark.read.parquet("/path/to/input.parquet")
df.write.parquet("/path/to/output")
Update: Since you mentioned in the comments below that it does not work for you, I made an experiment and posting the session transcript below:
$ spark-shell
scala> sc.hadoopConfiguration.setInt("parquet.block.size", 200000)
scala> var df = spark.read.parquet("/tmp/infile.parquet")
df: org.apache.spark.sql.DataFrame = [field0000: binary, field0001: binary ... 78 more fields]
scala> df.write.parquet("/tmp/200K")
scala> df.write.format("parquet").mode("Overwrite").options(Map("parquet.block.size" -> "300000")).save("/tmp/300K")
scala> :quit
$ hadoop fs -copyToLocal /tmp/{200K,300K} /tmp
$ parquet-tools meta /tmp/infile.parquet | grep "row group" | head -n 3
row group 1: RC:4291 TS:5004800 OFFSET:4
row group 2: RC:3854 TS:4499360 OFFSET:5004804
row group 3: RC:4293 TS:5004640 OFFSET:10000000
$ parquet-tools meta /tmp/200K/part-00000-* | grep "row group" | head -n 3
row group 1: RC:169 TS:202080 OFFSET:4
row group 2: RC:168 TS:201760 OFFSET:190164
row group 3: RC:169 TS:203680 OFFSET:380324
$ parquet-tools meta /tmp/300K/part-00000-* | grep "row group" | head -n 3
row group 1: RC:254 TS:302720 OFFSET:4
row group 2: RC:255 TS:303280 OFFSET:284004
row group 3: RC:263 TS:303200 OFFSET:568884
By looking at the TS values you can see that the input file had a row group size of 4.5-5M and the output files have row groups sizes of 200K and 300K, respectively. This shows that the value set using sc.hadoopConfiguration
becomes the "default", while the other method you mentioned in a comment below involving df.options
overrides this default.
Update 2: Now that you have posted your output, I can see what is going on. In your case, compression is taking place, increasing the amount of data that will fit in row groups. The row group size applies to the compressed data, but TS shows the size of uncompressed data. However, you can deduce the size of row groups by subtracting their starting offsets. For example, the compressed size of your first row group is 59176084 - 4 = 59176080 bytes or less (since padding can take place as well). I copied your results into /tmp/rowgroups.dat on my computer and calculated your row group sizes by issuing the following command:
$ cat /tmp/rowgroups.dat | sed 's/.*OFFSET://' | numinterval
59176080
60809783
29408673
64243499
63585828
54864199
57684584
38374804
55453519
(The numinterval
command is in the num-utils
package on Ubuntu.) As you can see, all of your row groups are smaller than the row group size you specified. (The reason why they are not exactly the specified size is PARQUET-1337.)
do you have to setparquet.block.size
in the hadoopConfiguration? Is it the same as doingdf.write.option("parquet.block.size", "134217728").save("/path")
?
– user3685285
Nov 19 at 19:01
It's the same. You can use either. The one specified in write.option overrides the one specified in hadoopConfiguration.
– Zoltan
Nov 19 at 19:27
Hm, i tried to setparquet.block.size
insc.hadoop.Configuration
in a spark-shell, but when I usedparquet-tools
to check, it still resulted in much larger row groups than I had specified. Perhaps it has something to do with spark-shell?
– user3685285
Nov 19 at 20:30
It should besc.hadoopConfiguration
, notsc.hadoop.Configuration
, but I guess it was just a typo in your comment as you would have gotten an error message if you actually used that in spark-shell. It's strange that it does not work. Have you tried the other method that you found (df.write.option
) as well?
– Zoltan
Nov 19 at 20:40
This is how spark itself does it in their tests and benchmarks.
– Zoltan
Nov 19 at 20:47
|
show 8 more comments
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53325155%2fhow-do-you-set-the-row-group-size-of-files-in-hdfs%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
The parquet.block.size
property only affects Parquet writers. The hdfs dfs -cp
command copies files regardless of their contents on the other hand. The parquet.block.size
property is therefore ignored by hdfs dfs -cp
.
Imagine that you have an application that takes screenshots in either JPG or PNG format, depending on a config file. You make a copy of those screenshots with the cp
command. Naturally, even if you change the desired image format in the config file, the cp
command will always create output files in the image format of the original files, regardless of the config file. The config file is only used by the screenshot application and not by cp
. This is how the parquet.block.size
property works as well.
What you can do to change the block size is to rewrite the file. You mentioned that you have spark-shell
. Use that to rewrite the Parquet file by issuing
sc.hadoopConfiguration.setInt("parquet.block.size", 67108864)
var df = spark.read.parquet("/path/to/input.parquet")
df.write.parquet("/path/to/output")
Update: Since you mentioned in the comments below that it does not work for you, I made an experiment and posting the session transcript below:
$ spark-shell
scala> sc.hadoopConfiguration.setInt("parquet.block.size", 200000)
scala> var df = spark.read.parquet("/tmp/infile.parquet")
df: org.apache.spark.sql.DataFrame = [field0000: binary, field0001: binary ... 78 more fields]
scala> df.write.parquet("/tmp/200K")
scala> df.write.format("parquet").mode("Overwrite").options(Map("parquet.block.size" -> "300000")).save("/tmp/300K")
scala> :quit
$ hadoop fs -copyToLocal /tmp/{200K,300K} /tmp
$ parquet-tools meta /tmp/infile.parquet | grep "row group" | head -n 3
row group 1: RC:4291 TS:5004800 OFFSET:4
row group 2: RC:3854 TS:4499360 OFFSET:5004804
row group 3: RC:4293 TS:5004640 OFFSET:10000000
$ parquet-tools meta /tmp/200K/part-00000-* | grep "row group" | head -n 3
row group 1: RC:169 TS:202080 OFFSET:4
row group 2: RC:168 TS:201760 OFFSET:190164
row group 3: RC:169 TS:203680 OFFSET:380324
$ parquet-tools meta /tmp/300K/part-00000-* | grep "row group" | head -n 3
row group 1: RC:254 TS:302720 OFFSET:4
row group 2: RC:255 TS:303280 OFFSET:284004
row group 3: RC:263 TS:303200 OFFSET:568884
By looking at the TS values you can see that the input file had a row group size of 4.5-5M and the output files have row groups sizes of 200K and 300K, respectively. This shows that the value set using sc.hadoopConfiguration
becomes the "default", while the other method you mentioned in a comment below involving df.options
overrides this default.
Update 2: Now that you have posted your output, I can see what is going on. In your case, compression is taking place, increasing the amount of data that will fit in row groups. The row group size applies to the compressed data, but TS shows the size of uncompressed data. However, you can deduce the size of row groups by subtracting their starting offsets. For example, the compressed size of your first row group is 59176084 - 4 = 59176080 bytes or less (since padding can take place as well). I copied your results into /tmp/rowgroups.dat on my computer and calculated your row group sizes by issuing the following command:
$ cat /tmp/rowgroups.dat | sed 's/.*OFFSET://' | numinterval
59176080
60809783
29408673
64243499
63585828
54864199
57684584
38374804
55453519
(The numinterval
command is in the num-utils
package on Ubuntu.) As you can see, all of your row groups are smaller than the row group size you specified. (The reason why they are not exactly the specified size is PARQUET-1337.)
do you have to setparquet.block.size
in the hadoopConfiguration? Is it the same as doingdf.write.option("parquet.block.size", "134217728").save("/path")
?
– user3685285
Nov 19 at 19:01
It's the same. You can use either. The one specified in write.option overrides the one specified in hadoopConfiguration.
– Zoltan
Nov 19 at 19:27
Hm, i tried to setparquet.block.size
insc.hadoop.Configuration
in a spark-shell, but when I usedparquet-tools
to check, it still resulted in much larger row groups than I had specified. Perhaps it has something to do with spark-shell?
– user3685285
Nov 19 at 20:30
It should besc.hadoopConfiguration
, notsc.hadoop.Configuration
, but I guess it was just a typo in your comment as you would have gotten an error message if you actually used that in spark-shell. It's strange that it does not work. Have you tried the other method that you found (df.write.option
) as well?
– Zoltan
Nov 19 at 20:40
This is how spark itself does it in their tests and benchmarks.
– Zoltan
Nov 19 at 20:47
|
show 8 more comments
The parquet.block.size
property only affects Parquet writers. The hdfs dfs -cp
command copies files regardless of their contents on the other hand. The parquet.block.size
property is therefore ignored by hdfs dfs -cp
.
Imagine that you have an application that takes screenshots in either JPG or PNG format, depending on a config file. You make a copy of those screenshots with the cp
command. Naturally, even if you change the desired image format in the config file, the cp
command will always create output files in the image format of the original files, regardless of the config file. The config file is only used by the screenshot application and not by cp
. This is how the parquet.block.size
property works as well.
What you can do to change the block size is to rewrite the file. You mentioned that you have spark-shell
. Use that to rewrite the Parquet file by issuing
sc.hadoopConfiguration.setInt("parquet.block.size", 67108864)
var df = spark.read.parquet("/path/to/input.parquet")
df.write.parquet("/path/to/output")
Update: Since you mentioned in the comments below that it does not work for you, I made an experiment and posting the session transcript below:
$ spark-shell
scala> sc.hadoopConfiguration.setInt("parquet.block.size", 200000)
scala> var df = spark.read.parquet("/tmp/infile.parquet")
df: org.apache.spark.sql.DataFrame = [field0000: binary, field0001: binary ... 78 more fields]
scala> df.write.parquet("/tmp/200K")
scala> df.write.format("parquet").mode("Overwrite").options(Map("parquet.block.size" -> "300000")).save("/tmp/300K")
scala> :quit
$ hadoop fs -copyToLocal /tmp/{200K,300K} /tmp
$ parquet-tools meta /tmp/infile.parquet | grep "row group" | head -n 3
row group 1: RC:4291 TS:5004800 OFFSET:4
row group 2: RC:3854 TS:4499360 OFFSET:5004804
row group 3: RC:4293 TS:5004640 OFFSET:10000000
$ parquet-tools meta /tmp/200K/part-00000-* | grep "row group" | head -n 3
row group 1: RC:169 TS:202080 OFFSET:4
row group 2: RC:168 TS:201760 OFFSET:190164
row group 3: RC:169 TS:203680 OFFSET:380324
$ parquet-tools meta /tmp/300K/part-00000-* | grep "row group" | head -n 3
row group 1: RC:254 TS:302720 OFFSET:4
row group 2: RC:255 TS:303280 OFFSET:284004
row group 3: RC:263 TS:303200 OFFSET:568884
By looking at the TS values you can see that the input file had a row group size of 4.5-5M and the output files have row groups sizes of 200K and 300K, respectively. This shows that the value set using sc.hadoopConfiguration
becomes the "default", while the other method you mentioned in a comment below involving df.options
overrides this default.
Update 2: Now that you have posted your output, I can see what is going on. In your case, compression is taking place, increasing the amount of data that will fit in row groups. The row group size applies to the compressed data, but TS shows the size of uncompressed data. However, you can deduce the size of row groups by subtracting their starting offsets. For example, the compressed size of your first row group is 59176084 - 4 = 59176080 bytes or less (since padding can take place as well). I copied your results into /tmp/rowgroups.dat on my computer and calculated your row group sizes by issuing the following command:
$ cat /tmp/rowgroups.dat | sed 's/.*OFFSET://' | numinterval
59176080
60809783
29408673
64243499
63585828
54864199
57684584
38374804
55453519
(The numinterval
command is in the num-utils
package on Ubuntu.) As you can see, all of your row groups are smaller than the row group size you specified. (The reason why they are not exactly the specified size is PARQUET-1337.)
do you have to setparquet.block.size
in the hadoopConfiguration? Is it the same as doingdf.write.option("parquet.block.size", "134217728").save("/path")
?
– user3685285
Nov 19 at 19:01
It's the same. You can use either. The one specified in write.option overrides the one specified in hadoopConfiguration.
– Zoltan
Nov 19 at 19:27
Hm, i tried to setparquet.block.size
insc.hadoop.Configuration
in a spark-shell, but when I usedparquet-tools
to check, it still resulted in much larger row groups than I had specified. Perhaps it has something to do with spark-shell?
– user3685285
Nov 19 at 20:30
It should besc.hadoopConfiguration
, notsc.hadoop.Configuration
, but I guess it was just a typo in your comment as you would have gotten an error message if you actually used that in spark-shell. It's strange that it does not work. Have you tried the other method that you found (df.write.option
) as well?
– Zoltan
Nov 19 at 20:40
This is how spark itself does it in their tests and benchmarks.
– Zoltan
Nov 19 at 20:47
|
show 8 more comments
The parquet.block.size
property only affects Parquet writers. The hdfs dfs -cp
command copies files regardless of their contents on the other hand. The parquet.block.size
property is therefore ignored by hdfs dfs -cp
.
Imagine that you have an application that takes screenshots in either JPG or PNG format, depending on a config file. You make a copy of those screenshots with the cp
command. Naturally, even if you change the desired image format in the config file, the cp
command will always create output files in the image format of the original files, regardless of the config file. The config file is only used by the screenshot application and not by cp
. This is how the parquet.block.size
property works as well.
What you can do to change the block size is to rewrite the file. You mentioned that you have spark-shell
. Use that to rewrite the Parquet file by issuing
sc.hadoopConfiguration.setInt("parquet.block.size", 67108864)
var df = spark.read.parquet("/path/to/input.parquet")
df.write.parquet("/path/to/output")
Update: Since you mentioned in the comments below that it does not work for you, I made an experiment and posting the session transcript below:
$ spark-shell
scala> sc.hadoopConfiguration.setInt("parquet.block.size", 200000)
scala> var df = spark.read.parquet("/tmp/infile.parquet")
df: org.apache.spark.sql.DataFrame = [field0000: binary, field0001: binary ... 78 more fields]
scala> df.write.parquet("/tmp/200K")
scala> df.write.format("parquet").mode("Overwrite").options(Map("parquet.block.size" -> "300000")).save("/tmp/300K")
scala> :quit
$ hadoop fs -copyToLocal /tmp/{200K,300K} /tmp
$ parquet-tools meta /tmp/infile.parquet | grep "row group" | head -n 3
row group 1: RC:4291 TS:5004800 OFFSET:4
row group 2: RC:3854 TS:4499360 OFFSET:5004804
row group 3: RC:4293 TS:5004640 OFFSET:10000000
$ parquet-tools meta /tmp/200K/part-00000-* | grep "row group" | head -n 3
row group 1: RC:169 TS:202080 OFFSET:4
row group 2: RC:168 TS:201760 OFFSET:190164
row group 3: RC:169 TS:203680 OFFSET:380324
$ parquet-tools meta /tmp/300K/part-00000-* | grep "row group" | head -n 3
row group 1: RC:254 TS:302720 OFFSET:4
row group 2: RC:255 TS:303280 OFFSET:284004
row group 3: RC:263 TS:303200 OFFSET:568884
By looking at the TS values you can see that the input file had a row group size of 4.5-5M and the output files have row groups sizes of 200K and 300K, respectively. This shows that the value set using sc.hadoopConfiguration
becomes the "default", while the other method you mentioned in a comment below involving df.options
overrides this default.
Update 2: Now that you have posted your output, I can see what is going on. In your case, compression is taking place, increasing the amount of data that will fit in row groups. The row group size applies to the compressed data, but TS shows the size of uncompressed data. However, you can deduce the size of row groups by subtracting their starting offsets. For example, the compressed size of your first row group is 59176084 - 4 = 59176080 bytes or less (since padding can take place as well). I copied your results into /tmp/rowgroups.dat on my computer and calculated your row group sizes by issuing the following command:
$ cat /tmp/rowgroups.dat | sed 's/.*OFFSET://' | numinterval
59176080
60809783
29408673
64243499
63585828
54864199
57684584
38374804
55453519
(The numinterval
command is in the num-utils
package on Ubuntu.) As you can see, all of your row groups are smaller than the row group size you specified. (The reason why they are not exactly the specified size is PARQUET-1337.)
The parquet.block.size
property only affects Parquet writers. The hdfs dfs -cp
command copies files regardless of their contents on the other hand. The parquet.block.size
property is therefore ignored by hdfs dfs -cp
.
Imagine that you have an application that takes screenshots in either JPG or PNG format, depending on a config file. You make a copy of those screenshots with the cp
command. Naturally, even if you change the desired image format in the config file, the cp
command will always create output files in the image format of the original files, regardless of the config file. The config file is only used by the screenshot application and not by cp
. This is how the parquet.block.size
property works as well.
What you can do to change the block size is to rewrite the file. You mentioned that you have spark-shell
. Use that to rewrite the Parquet file by issuing
sc.hadoopConfiguration.setInt("parquet.block.size", 67108864)
var df = spark.read.parquet("/path/to/input.parquet")
df.write.parquet("/path/to/output")
Update: Since you mentioned in the comments below that it does not work for you, I made an experiment and posting the session transcript below:
$ spark-shell
scala> sc.hadoopConfiguration.setInt("parquet.block.size", 200000)
scala> var df = spark.read.parquet("/tmp/infile.parquet")
df: org.apache.spark.sql.DataFrame = [field0000: binary, field0001: binary ... 78 more fields]
scala> df.write.parquet("/tmp/200K")
scala> df.write.format("parquet").mode("Overwrite").options(Map("parquet.block.size" -> "300000")).save("/tmp/300K")
scala> :quit
$ hadoop fs -copyToLocal /tmp/{200K,300K} /tmp
$ parquet-tools meta /tmp/infile.parquet | grep "row group" | head -n 3
row group 1: RC:4291 TS:5004800 OFFSET:4
row group 2: RC:3854 TS:4499360 OFFSET:5004804
row group 3: RC:4293 TS:5004640 OFFSET:10000000
$ parquet-tools meta /tmp/200K/part-00000-* | grep "row group" | head -n 3
row group 1: RC:169 TS:202080 OFFSET:4
row group 2: RC:168 TS:201760 OFFSET:190164
row group 3: RC:169 TS:203680 OFFSET:380324
$ parquet-tools meta /tmp/300K/part-00000-* | grep "row group" | head -n 3
row group 1: RC:254 TS:302720 OFFSET:4
row group 2: RC:255 TS:303280 OFFSET:284004
row group 3: RC:263 TS:303200 OFFSET:568884
By looking at the TS values you can see that the input file had a row group size of 4.5-5M and the output files have row groups sizes of 200K and 300K, respectively. This shows that the value set using sc.hadoopConfiguration
becomes the "default", while the other method you mentioned in a comment below involving df.options
overrides this default.
Update 2: Now that you have posted your output, I can see what is going on. In your case, compression is taking place, increasing the amount of data that will fit in row groups. The row group size applies to the compressed data, but TS shows the size of uncompressed data. However, you can deduce the size of row groups by subtracting their starting offsets. For example, the compressed size of your first row group is 59176084 - 4 = 59176080 bytes or less (since padding can take place as well). I copied your results into /tmp/rowgroups.dat on my computer and calculated your row group sizes by issuing the following command:
$ cat /tmp/rowgroups.dat | sed 's/.*OFFSET://' | numinterval
59176080
60809783
29408673
64243499
63585828
54864199
57684584
38374804
55453519
(The numinterval
command is in the num-utils
package on Ubuntu.) As you can see, all of your row groups are smaller than the row group size you specified. (The reason why they are not exactly the specified size is PARQUET-1337.)
edited Nov 20 at 22:18
answered Nov 19 at 13:34
Zoltan
1,128313
1,128313
do you have to setparquet.block.size
in the hadoopConfiguration? Is it the same as doingdf.write.option("parquet.block.size", "134217728").save("/path")
?
– user3685285
Nov 19 at 19:01
It's the same. You can use either. The one specified in write.option overrides the one specified in hadoopConfiguration.
– Zoltan
Nov 19 at 19:27
Hm, i tried to setparquet.block.size
insc.hadoop.Configuration
in a spark-shell, but when I usedparquet-tools
to check, it still resulted in much larger row groups than I had specified. Perhaps it has something to do with spark-shell?
– user3685285
Nov 19 at 20:30
It should besc.hadoopConfiguration
, notsc.hadoop.Configuration
, but I guess it was just a typo in your comment as you would have gotten an error message if you actually used that in spark-shell. It's strange that it does not work. Have you tried the other method that you found (df.write.option
) as well?
– Zoltan
Nov 19 at 20:40
This is how spark itself does it in their tests and benchmarks.
– Zoltan
Nov 19 at 20:47
|
show 8 more comments
do you have to setparquet.block.size
in the hadoopConfiguration? Is it the same as doingdf.write.option("parquet.block.size", "134217728").save("/path")
?
– user3685285
Nov 19 at 19:01
It's the same. You can use either. The one specified in write.option overrides the one specified in hadoopConfiguration.
– Zoltan
Nov 19 at 19:27
Hm, i tried to setparquet.block.size
insc.hadoop.Configuration
in a spark-shell, but when I usedparquet-tools
to check, it still resulted in much larger row groups than I had specified. Perhaps it has something to do with spark-shell?
– user3685285
Nov 19 at 20:30
It should besc.hadoopConfiguration
, notsc.hadoop.Configuration
, but I guess it was just a typo in your comment as you would have gotten an error message if you actually used that in spark-shell. It's strange that it does not work. Have you tried the other method that you found (df.write.option
) as well?
– Zoltan
Nov 19 at 20:40
This is how spark itself does it in their tests and benchmarks.
– Zoltan
Nov 19 at 20:47
do you have to set
parquet.block.size
in the hadoopConfiguration? Is it the same as doing df.write.option("parquet.block.size", "134217728").save("/path")
?– user3685285
Nov 19 at 19:01
do you have to set
parquet.block.size
in the hadoopConfiguration? Is it the same as doing df.write.option("parquet.block.size", "134217728").save("/path")
?– user3685285
Nov 19 at 19:01
It's the same. You can use either. The one specified in write.option overrides the one specified in hadoopConfiguration.
– Zoltan
Nov 19 at 19:27
It's the same. You can use either. The one specified in write.option overrides the one specified in hadoopConfiguration.
– Zoltan
Nov 19 at 19:27
Hm, i tried to set
parquet.block.size
in sc.hadoop.Configuration
in a spark-shell, but when I used parquet-tools
to check, it still resulted in much larger row groups than I had specified. Perhaps it has something to do with spark-shell?– user3685285
Nov 19 at 20:30
Hm, i tried to set
parquet.block.size
in sc.hadoop.Configuration
in a spark-shell, but when I used parquet-tools
to check, it still resulted in much larger row groups than I had specified. Perhaps it has something to do with spark-shell?– user3685285
Nov 19 at 20:30
It should be
sc.hadoopConfiguration
, not sc.hadoop.Configuration
, but I guess it was just a typo in your comment as you would have gotten an error message if you actually used that in spark-shell. It's strange that it does not work. Have you tried the other method that you found (df.write.option
) as well?– Zoltan
Nov 19 at 20:40
It should be
sc.hadoopConfiguration
, not sc.hadoop.Configuration
, but I guess it was just a typo in your comment as you would have gotten an error message if you actually used that in spark-shell. It's strange that it does not work. Have you tried the other method that you found (df.write.option
) as well?– Zoltan
Nov 19 at 20:40
This is how spark itself does it in their tests and benchmarks.
– Zoltan
Nov 19 at 20:47
This is how spark itself does it in their tests and benchmarks.
– Zoltan
Nov 19 at 20:47
|
show 8 more comments
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Some of your past answers have not been well-received, and you're in danger of being blocked from answering.
Please pay close attention to the following guidance:
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53325155%2fhow-do-you-set-the-row-group-size-of-files-in-hdfs%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown