How do you set the row group size of files in hdfs?












0














I am running some experiments on block size (dfs.block.size) and row group size (parquet.block.size) in hdfs.



I have a large set of data in hdfs, and I want to replicate the data with various block sizes and row group sizes for testing. I'm able to copy the data with a different block size using:



hdfs dfs -D dfs.block.size=67108864 -D parquet.block.size=67108864 -cp /new_sample_parquet /new_sample_parquet_64M


But only the dfs.block.size gets changed. I am verifying with hdfs dfs -stat for the block size, and parquet-tools meta for the row group size. In fact, if I replace parquet.block.size with blah.blah.blah it has the same effect. I even went into spark-shell and set the parquet.block.size property manually using



sc.hadoopConfiguration.setInt("parquet.block.size", 67108864).


I am using hadoop 3.1.0. I got the property name of parquet.block.size from here.



Here is the first 10 rows of the output of my attempt



row group 1:                    RC:4140100 TS:150147503 OFFSET:4
row group 2: RC:3520100 TS:158294646 OFFSET:59176084
row group 3: RC:880100 TS:80122359 OFFSET:119985867
row group 4: RC:583579 TS:197303521 OFFSET:149394540
row group 5: RC:585594 TS:194850776 OFFSET:213638039
row group 6: RC:2620100 TS:130170698 OFFSET:277223867
row group 7: RC:2750100 TS:136761819 OFFSET:332088066
row group 8: RC:1790100 TS:86766854 OFFSET:389772650
row group 9: RC:2620100 TS:125876377 OFFSET:428147454
row group 10: RC:1700100 TS:83791047 OFFSET:483600973


As you can se, the TS (total size) is way larger than 64MB (67108864 bytes)



My current theory:



I am doing this in spark-shell:



sc.hadoopConfiguration.setInt("parquet.block.size", 67108864)
val a = spark.read.parquet("my_sample_data")
a.rdd.getNumPartitions // 1034
val s = a.coalesce(27)
s.write.format("parquet").mode("Overwrite").options(Map("dfs.block.size" -> "67108864")).save("/my_new_sample_data")


So perhaps it's because my input data already has 1034 partitions. I'm really not sure. My data has about 118 columns per row.










share|improve this question





























    0














    I am running some experiments on block size (dfs.block.size) and row group size (parquet.block.size) in hdfs.



    I have a large set of data in hdfs, and I want to replicate the data with various block sizes and row group sizes for testing. I'm able to copy the data with a different block size using:



    hdfs dfs -D dfs.block.size=67108864 -D parquet.block.size=67108864 -cp /new_sample_parquet /new_sample_parquet_64M


    But only the dfs.block.size gets changed. I am verifying with hdfs dfs -stat for the block size, and parquet-tools meta for the row group size. In fact, if I replace parquet.block.size with blah.blah.blah it has the same effect. I even went into spark-shell and set the parquet.block.size property manually using



    sc.hadoopConfiguration.setInt("parquet.block.size", 67108864).


    I am using hadoop 3.1.0. I got the property name of parquet.block.size from here.



    Here is the first 10 rows of the output of my attempt



    row group 1:                    RC:4140100 TS:150147503 OFFSET:4
    row group 2: RC:3520100 TS:158294646 OFFSET:59176084
    row group 3: RC:880100 TS:80122359 OFFSET:119985867
    row group 4: RC:583579 TS:197303521 OFFSET:149394540
    row group 5: RC:585594 TS:194850776 OFFSET:213638039
    row group 6: RC:2620100 TS:130170698 OFFSET:277223867
    row group 7: RC:2750100 TS:136761819 OFFSET:332088066
    row group 8: RC:1790100 TS:86766854 OFFSET:389772650
    row group 9: RC:2620100 TS:125876377 OFFSET:428147454
    row group 10: RC:1700100 TS:83791047 OFFSET:483600973


    As you can se, the TS (total size) is way larger than 64MB (67108864 bytes)



    My current theory:



    I am doing this in spark-shell:



    sc.hadoopConfiguration.setInt("parquet.block.size", 67108864)
    val a = spark.read.parquet("my_sample_data")
    a.rdd.getNumPartitions // 1034
    val s = a.coalesce(27)
    s.write.format("parquet").mode("Overwrite").options(Map("dfs.block.size" -> "67108864")).save("/my_new_sample_data")


    So perhaps it's because my input data already has 1034 partitions. I'm really not sure. My data has about 118 columns per row.










    share|improve this question



























      0












      0








      0







      I am running some experiments on block size (dfs.block.size) and row group size (parquet.block.size) in hdfs.



      I have a large set of data in hdfs, and I want to replicate the data with various block sizes and row group sizes for testing. I'm able to copy the data with a different block size using:



      hdfs dfs -D dfs.block.size=67108864 -D parquet.block.size=67108864 -cp /new_sample_parquet /new_sample_parquet_64M


      But only the dfs.block.size gets changed. I am verifying with hdfs dfs -stat for the block size, and parquet-tools meta for the row group size. In fact, if I replace parquet.block.size with blah.blah.blah it has the same effect. I even went into spark-shell and set the parquet.block.size property manually using



      sc.hadoopConfiguration.setInt("parquet.block.size", 67108864).


      I am using hadoop 3.1.0. I got the property name of parquet.block.size from here.



      Here is the first 10 rows of the output of my attempt



      row group 1:                    RC:4140100 TS:150147503 OFFSET:4
      row group 2: RC:3520100 TS:158294646 OFFSET:59176084
      row group 3: RC:880100 TS:80122359 OFFSET:119985867
      row group 4: RC:583579 TS:197303521 OFFSET:149394540
      row group 5: RC:585594 TS:194850776 OFFSET:213638039
      row group 6: RC:2620100 TS:130170698 OFFSET:277223867
      row group 7: RC:2750100 TS:136761819 OFFSET:332088066
      row group 8: RC:1790100 TS:86766854 OFFSET:389772650
      row group 9: RC:2620100 TS:125876377 OFFSET:428147454
      row group 10: RC:1700100 TS:83791047 OFFSET:483600973


      As you can se, the TS (total size) is way larger than 64MB (67108864 bytes)



      My current theory:



      I am doing this in spark-shell:



      sc.hadoopConfiguration.setInt("parquet.block.size", 67108864)
      val a = spark.read.parquet("my_sample_data")
      a.rdd.getNumPartitions // 1034
      val s = a.coalesce(27)
      s.write.format("parquet").mode("Overwrite").options(Map("dfs.block.size" -> "67108864")).save("/my_new_sample_data")


      So perhaps it's because my input data already has 1034 partitions. I'm really not sure. My data has about 118 columns per row.










      share|improve this question















      I am running some experiments on block size (dfs.block.size) and row group size (parquet.block.size) in hdfs.



      I have a large set of data in hdfs, and I want to replicate the data with various block sizes and row group sizes for testing. I'm able to copy the data with a different block size using:



      hdfs dfs -D dfs.block.size=67108864 -D parquet.block.size=67108864 -cp /new_sample_parquet /new_sample_parquet_64M


      But only the dfs.block.size gets changed. I am verifying with hdfs dfs -stat for the block size, and parquet-tools meta for the row group size. In fact, if I replace parquet.block.size with blah.blah.blah it has the same effect. I even went into spark-shell and set the parquet.block.size property manually using



      sc.hadoopConfiguration.setInt("parquet.block.size", 67108864).


      I am using hadoop 3.1.0. I got the property name of parquet.block.size from here.



      Here is the first 10 rows of the output of my attempt



      row group 1:                    RC:4140100 TS:150147503 OFFSET:4
      row group 2: RC:3520100 TS:158294646 OFFSET:59176084
      row group 3: RC:880100 TS:80122359 OFFSET:119985867
      row group 4: RC:583579 TS:197303521 OFFSET:149394540
      row group 5: RC:585594 TS:194850776 OFFSET:213638039
      row group 6: RC:2620100 TS:130170698 OFFSET:277223867
      row group 7: RC:2750100 TS:136761819 OFFSET:332088066
      row group 8: RC:1790100 TS:86766854 OFFSET:389772650
      row group 9: RC:2620100 TS:125876377 OFFSET:428147454
      row group 10: RC:1700100 TS:83791047 OFFSET:483600973


      As you can se, the TS (total size) is way larger than 64MB (67108864 bytes)



      My current theory:



      I am doing this in spark-shell:



      sc.hadoopConfiguration.setInt("parquet.block.size", 67108864)
      val a = spark.read.parquet("my_sample_data")
      a.rdd.getNumPartitions // 1034
      val s = a.coalesce(27)
      s.write.format("parquet").mode("Overwrite").options(Map("dfs.block.size" -> "67108864")).save("/my_new_sample_data")


      So perhaps it's because my input data already has 1034 partitions. I'm really not sure. My data has about 118 columns per row.







      hadoop hdfs parquet parquet-mr






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Nov 20 at 19:37

























      asked Nov 15 at 17:43









      user3685285

      1,54731737




      1,54731737
























          1 Answer
          1






          active

          oldest

          votes


















          1














          The parquet.block.size property only affects Parquet writers. The hdfs dfs -cp command copies files regardless of their contents on the other hand. The parquet.block.size property is therefore ignored by hdfs dfs -cp.



          Imagine that you have an application that takes screenshots in either JPG or PNG format, depending on a config file. You make a copy of those screenshots with the cp command. Naturally, even if you change the desired image format in the config file, the cp command will always create output files in the image format of the original files, regardless of the config file. The config file is only used by the screenshot application and not by cp. This is how the parquet.block.size property works as well.



          What you can do to change the block size is to rewrite the file. You mentioned that you have spark-shell. Use that to rewrite the Parquet file by issuing



          sc.hadoopConfiguration.setInt("parquet.block.size", 67108864)
          var df = spark.read.parquet("/path/to/input.parquet")
          df.write.parquet("/path/to/output")


          Update: Since you mentioned in the comments below that it does not work for you, I made an experiment and posting the session transcript below:



          $ spark-shell
          scala> sc.hadoopConfiguration.setInt("parquet.block.size", 200000)
          scala> var df = spark.read.parquet("/tmp/infile.parquet")
          df: org.apache.spark.sql.DataFrame = [field0000: binary, field0001: binary ... 78 more fields]
          scala> df.write.parquet("/tmp/200K")
          scala> df.write.format("parquet").mode("Overwrite").options(Map("parquet.block.size" -> "300000")).save("/tmp/300K")
          scala> :quit
          $ hadoop fs -copyToLocal /tmp/{200K,300K} /tmp
          $ parquet-tools meta /tmp/infile.parquet | grep "row group" | head -n 3
          row group 1: RC:4291 TS:5004800 OFFSET:4
          row group 2: RC:3854 TS:4499360 OFFSET:5004804
          row group 3: RC:4293 TS:5004640 OFFSET:10000000
          $ parquet-tools meta /tmp/200K/part-00000-* | grep "row group" | head -n 3
          row group 1: RC:169 TS:202080 OFFSET:4
          row group 2: RC:168 TS:201760 OFFSET:190164
          row group 3: RC:169 TS:203680 OFFSET:380324
          $ parquet-tools meta /tmp/300K/part-00000-* | grep "row group" | head -n 3
          row group 1: RC:254 TS:302720 OFFSET:4
          row group 2: RC:255 TS:303280 OFFSET:284004
          row group 3: RC:263 TS:303200 OFFSET:568884


          By looking at the TS values you can see that the input file had a row group size of 4.5-5M and the output files have row groups sizes of 200K and 300K, respectively. This shows that the value set using sc.hadoopConfiguration becomes the "default", while the other method you mentioned in a comment below involving df.options overrides this default.



          Update 2: Now that you have posted your output, I can see what is going on. In your case, compression is taking place, increasing the amount of data that will fit in row groups. The row group size applies to the compressed data, but TS shows the size of uncompressed data. However, you can deduce the size of row groups by subtracting their starting offsets. For example, the compressed size of your first row group is 59176084 - 4 = 59176080 bytes or less (since padding can take place as well). I copied your results into /tmp/rowgroups.dat on my computer and calculated your row group sizes by issuing the following command:



          $ cat /tmp/rowgroups.dat | sed 's/.*OFFSET://' | numinterval
          59176080
          60809783
          29408673
          64243499
          63585828
          54864199
          57684584
          38374804
          55453519


          (The numinterval command is in the num-utils package on Ubuntu.) As you can see, all of your row groups are smaller than the row group size you specified. (The reason why they are not exactly the specified size is PARQUET-1337.)






          share|improve this answer























          • do you have to set parquet.block.size in the hadoopConfiguration? Is it the same as doing df.write.option("parquet.block.size", "134217728").save("/path")?
            – user3685285
            Nov 19 at 19:01










          • It's the same. You can use either. The one specified in write.option overrides the one specified in hadoopConfiguration.
            – Zoltan
            Nov 19 at 19:27












          • Hm, i tried to set parquet.block.size in sc.hadoop.Configuration in a spark-shell, but when I used parquet-tools to check, it still resulted in much larger row groups than I had specified. Perhaps it has something to do with spark-shell?
            – user3685285
            Nov 19 at 20:30










          • It should be sc.hadoopConfiguration, not sc.hadoop.Configuration, but I guess it was just a typo in your comment as you would have gotten an error message if you actually used that in spark-shell. It's strange that it does not work. Have you tried the other method that you found (df.write.option) as well?
            – Zoltan
            Nov 19 at 20:40










          • This is how spark itself does it in their tests and benchmarks.
            – Zoltan
            Nov 19 at 20:47











          Your Answer






          StackExchange.ifUsing("editor", function () {
          StackExchange.using("externalEditor", function () {
          StackExchange.using("snippets", function () {
          StackExchange.snippets.init();
          });
          });
          }, "code-snippets");

          StackExchange.ready(function() {
          var channelOptions = {
          tags: "".split(" "),
          id: "1"
          };
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function() {
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled) {
          StackExchange.using("snippets", function() {
          createEditor();
          });
          }
          else {
          createEditor();
          }
          });

          function createEditor() {
          StackExchange.prepareEditor({
          heartbeatType: 'answer',
          autoActivateHeartbeat: false,
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader: {
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          },
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          });


          }
          });














          draft saved

          draft discarded


















          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53325155%2fhow-do-you-set-the-row-group-size-of-files-in-hdfs%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown

























          1 Answer
          1






          active

          oldest

          votes








          1 Answer
          1






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes









          1














          The parquet.block.size property only affects Parquet writers. The hdfs dfs -cp command copies files regardless of their contents on the other hand. The parquet.block.size property is therefore ignored by hdfs dfs -cp.



          Imagine that you have an application that takes screenshots in either JPG or PNG format, depending on a config file. You make a copy of those screenshots with the cp command. Naturally, even if you change the desired image format in the config file, the cp command will always create output files in the image format of the original files, regardless of the config file. The config file is only used by the screenshot application and not by cp. This is how the parquet.block.size property works as well.



          What you can do to change the block size is to rewrite the file. You mentioned that you have spark-shell. Use that to rewrite the Parquet file by issuing



          sc.hadoopConfiguration.setInt("parquet.block.size", 67108864)
          var df = spark.read.parquet("/path/to/input.parquet")
          df.write.parquet("/path/to/output")


          Update: Since you mentioned in the comments below that it does not work for you, I made an experiment and posting the session transcript below:



          $ spark-shell
          scala> sc.hadoopConfiguration.setInt("parquet.block.size", 200000)
          scala> var df = spark.read.parquet("/tmp/infile.parquet")
          df: org.apache.spark.sql.DataFrame = [field0000: binary, field0001: binary ... 78 more fields]
          scala> df.write.parquet("/tmp/200K")
          scala> df.write.format("parquet").mode("Overwrite").options(Map("parquet.block.size" -> "300000")).save("/tmp/300K")
          scala> :quit
          $ hadoop fs -copyToLocal /tmp/{200K,300K} /tmp
          $ parquet-tools meta /tmp/infile.parquet | grep "row group" | head -n 3
          row group 1: RC:4291 TS:5004800 OFFSET:4
          row group 2: RC:3854 TS:4499360 OFFSET:5004804
          row group 3: RC:4293 TS:5004640 OFFSET:10000000
          $ parquet-tools meta /tmp/200K/part-00000-* | grep "row group" | head -n 3
          row group 1: RC:169 TS:202080 OFFSET:4
          row group 2: RC:168 TS:201760 OFFSET:190164
          row group 3: RC:169 TS:203680 OFFSET:380324
          $ parquet-tools meta /tmp/300K/part-00000-* | grep "row group" | head -n 3
          row group 1: RC:254 TS:302720 OFFSET:4
          row group 2: RC:255 TS:303280 OFFSET:284004
          row group 3: RC:263 TS:303200 OFFSET:568884


          By looking at the TS values you can see that the input file had a row group size of 4.5-5M and the output files have row groups sizes of 200K and 300K, respectively. This shows that the value set using sc.hadoopConfiguration becomes the "default", while the other method you mentioned in a comment below involving df.options overrides this default.



          Update 2: Now that you have posted your output, I can see what is going on. In your case, compression is taking place, increasing the amount of data that will fit in row groups. The row group size applies to the compressed data, but TS shows the size of uncompressed data. However, you can deduce the size of row groups by subtracting their starting offsets. For example, the compressed size of your first row group is 59176084 - 4 = 59176080 bytes or less (since padding can take place as well). I copied your results into /tmp/rowgroups.dat on my computer and calculated your row group sizes by issuing the following command:



          $ cat /tmp/rowgroups.dat | sed 's/.*OFFSET://' | numinterval
          59176080
          60809783
          29408673
          64243499
          63585828
          54864199
          57684584
          38374804
          55453519


          (The numinterval command is in the num-utils package on Ubuntu.) As you can see, all of your row groups are smaller than the row group size you specified. (The reason why they are not exactly the specified size is PARQUET-1337.)






          share|improve this answer























          • do you have to set parquet.block.size in the hadoopConfiguration? Is it the same as doing df.write.option("parquet.block.size", "134217728").save("/path")?
            – user3685285
            Nov 19 at 19:01










          • It's the same. You can use either. The one specified in write.option overrides the one specified in hadoopConfiguration.
            – Zoltan
            Nov 19 at 19:27












          • Hm, i tried to set parquet.block.size in sc.hadoop.Configuration in a spark-shell, but when I used parquet-tools to check, it still resulted in much larger row groups than I had specified. Perhaps it has something to do with spark-shell?
            – user3685285
            Nov 19 at 20:30










          • It should be sc.hadoopConfiguration, not sc.hadoop.Configuration, but I guess it was just a typo in your comment as you would have gotten an error message if you actually used that in spark-shell. It's strange that it does not work. Have you tried the other method that you found (df.write.option) as well?
            – Zoltan
            Nov 19 at 20:40










          • This is how spark itself does it in their tests and benchmarks.
            – Zoltan
            Nov 19 at 20:47
















          1














          The parquet.block.size property only affects Parquet writers. The hdfs dfs -cp command copies files regardless of their contents on the other hand. The parquet.block.size property is therefore ignored by hdfs dfs -cp.



          Imagine that you have an application that takes screenshots in either JPG or PNG format, depending on a config file. You make a copy of those screenshots with the cp command. Naturally, even if you change the desired image format in the config file, the cp command will always create output files in the image format of the original files, regardless of the config file. The config file is only used by the screenshot application and not by cp. This is how the parquet.block.size property works as well.



          What you can do to change the block size is to rewrite the file. You mentioned that you have spark-shell. Use that to rewrite the Parquet file by issuing



          sc.hadoopConfiguration.setInt("parquet.block.size", 67108864)
          var df = spark.read.parquet("/path/to/input.parquet")
          df.write.parquet("/path/to/output")


          Update: Since you mentioned in the comments below that it does not work for you, I made an experiment and posting the session transcript below:



          $ spark-shell
          scala> sc.hadoopConfiguration.setInt("parquet.block.size", 200000)
          scala> var df = spark.read.parquet("/tmp/infile.parquet")
          df: org.apache.spark.sql.DataFrame = [field0000: binary, field0001: binary ... 78 more fields]
          scala> df.write.parquet("/tmp/200K")
          scala> df.write.format("parquet").mode("Overwrite").options(Map("parquet.block.size" -> "300000")).save("/tmp/300K")
          scala> :quit
          $ hadoop fs -copyToLocal /tmp/{200K,300K} /tmp
          $ parquet-tools meta /tmp/infile.parquet | grep "row group" | head -n 3
          row group 1: RC:4291 TS:5004800 OFFSET:4
          row group 2: RC:3854 TS:4499360 OFFSET:5004804
          row group 3: RC:4293 TS:5004640 OFFSET:10000000
          $ parquet-tools meta /tmp/200K/part-00000-* | grep "row group" | head -n 3
          row group 1: RC:169 TS:202080 OFFSET:4
          row group 2: RC:168 TS:201760 OFFSET:190164
          row group 3: RC:169 TS:203680 OFFSET:380324
          $ parquet-tools meta /tmp/300K/part-00000-* | grep "row group" | head -n 3
          row group 1: RC:254 TS:302720 OFFSET:4
          row group 2: RC:255 TS:303280 OFFSET:284004
          row group 3: RC:263 TS:303200 OFFSET:568884


          By looking at the TS values you can see that the input file had a row group size of 4.5-5M and the output files have row groups sizes of 200K and 300K, respectively. This shows that the value set using sc.hadoopConfiguration becomes the "default", while the other method you mentioned in a comment below involving df.options overrides this default.



          Update 2: Now that you have posted your output, I can see what is going on. In your case, compression is taking place, increasing the amount of data that will fit in row groups. The row group size applies to the compressed data, but TS shows the size of uncompressed data. However, you can deduce the size of row groups by subtracting their starting offsets. For example, the compressed size of your first row group is 59176084 - 4 = 59176080 bytes or less (since padding can take place as well). I copied your results into /tmp/rowgroups.dat on my computer and calculated your row group sizes by issuing the following command:



          $ cat /tmp/rowgroups.dat | sed 's/.*OFFSET://' | numinterval
          59176080
          60809783
          29408673
          64243499
          63585828
          54864199
          57684584
          38374804
          55453519


          (The numinterval command is in the num-utils package on Ubuntu.) As you can see, all of your row groups are smaller than the row group size you specified. (The reason why they are not exactly the specified size is PARQUET-1337.)






          share|improve this answer























          • do you have to set parquet.block.size in the hadoopConfiguration? Is it the same as doing df.write.option("parquet.block.size", "134217728").save("/path")?
            – user3685285
            Nov 19 at 19:01










          • It's the same. You can use either. The one specified in write.option overrides the one specified in hadoopConfiguration.
            – Zoltan
            Nov 19 at 19:27












          • Hm, i tried to set parquet.block.size in sc.hadoop.Configuration in a spark-shell, but when I used parquet-tools to check, it still resulted in much larger row groups than I had specified. Perhaps it has something to do with spark-shell?
            – user3685285
            Nov 19 at 20:30










          • It should be sc.hadoopConfiguration, not sc.hadoop.Configuration, but I guess it was just a typo in your comment as you would have gotten an error message if you actually used that in spark-shell. It's strange that it does not work. Have you tried the other method that you found (df.write.option) as well?
            – Zoltan
            Nov 19 at 20:40










          • This is how spark itself does it in their tests and benchmarks.
            – Zoltan
            Nov 19 at 20:47














          1












          1








          1






          The parquet.block.size property only affects Parquet writers. The hdfs dfs -cp command copies files regardless of their contents on the other hand. The parquet.block.size property is therefore ignored by hdfs dfs -cp.



          Imagine that you have an application that takes screenshots in either JPG or PNG format, depending on a config file. You make a copy of those screenshots with the cp command. Naturally, even if you change the desired image format in the config file, the cp command will always create output files in the image format of the original files, regardless of the config file. The config file is only used by the screenshot application and not by cp. This is how the parquet.block.size property works as well.



          What you can do to change the block size is to rewrite the file. You mentioned that you have spark-shell. Use that to rewrite the Parquet file by issuing



          sc.hadoopConfiguration.setInt("parquet.block.size", 67108864)
          var df = spark.read.parquet("/path/to/input.parquet")
          df.write.parquet("/path/to/output")


          Update: Since you mentioned in the comments below that it does not work for you, I made an experiment and posting the session transcript below:



          $ spark-shell
          scala> sc.hadoopConfiguration.setInt("parquet.block.size", 200000)
          scala> var df = spark.read.parquet("/tmp/infile.parquet")
          df: org.apache.spark.sql.DataFrame = [field0000: binary, field0001: binary ... 78 more fields]
          scala> df.write.parquet("/tmp/200K")
          scala> df.write.format("parquet").mode("Overwrite").options(Map("parquet.block.size" -> "300000")).save("/tmp/300K")
          scala> :quit
          $ hadoop fs -copyToLocal /tmp/{200K,300K} /tmp
          $ parquet-tools meta /tmp/infile.parquet | grep "row group" | head -n 3
          row group 1: RC:4291 TS:5004800 OFFSET:4
          row group 2: RC:3854 TS:4499360 OFFSET:5004804
          row group 3: RC:4293 TS:5004640 OFFSET:10000000
          $ parquet-tools meta /tmp/200K/part-00000-* | grep "row group" | head -n 3
          row group 1: RC:169 TS:202080 OFFSET:4
          row group 2: RC:168 TS:201760 OFFSET:190164
          row group 3: RC:169 TS:203680 OFFSET:380324
          $ parquet-tools meta /tmp/300K/part-00000-* | grep "row group" | head -n 3
          row group 1: RC:254 TS:302720 OFFSET:4
          row group 2: RC:255 TS:303280 OFFSET:284004
          row group 3: RC:263 TS:303200 OFFSET:568884


          By looking at the TS values you can see that the input file had a row group size of 4.5-5M and the output files have row groups sizes of 200K and 300K, respectively. This shows that the value set using sc.hadoopConfiguration becomes the "default", while the other method you mentioned in a comment below involving df.options overrides this default.



          Update 2: Now that you have posted your output, I can see what is going on. In your case, compression is taking place, increasing the amount of data that will fit in row groups. The row group size applies to the compressed data, but TS shows the size of uncompressed data. However, you can deduce the size of row groups by subtracting their starting offsets. For example, the compressed size of your first row group is 59176084 - 4 = 59176080 bytes or less (since padding can take place as well). I copied your results into /tmp/rowgroups.dat on my computer and calculated your row group sizes by issuing the following command:



          $ cat /tmp/rowgroups.dat | sed 's/.*OFFSET://' | numinterval
          59176080
          60809783
          29408673
          64243499
          63585828
          54864199
          57684584
          38374804
          55453519


          (The numinterval command is in the num-utils package on Ubuntu.) As you can see, all of your row groups are smaller than the row group size you specified. (The reason why they are not exactly the specified size is PARQUET-1337.)






          share|improve this answer














          The parquet.block.size property only affects Parquet writers. The hdfs dfs -cp command copies files regardless of their contents on the other hand. The parquet.block.size property is therefore ignored by hdfs dfs -cp.



          Imagine that you have an application that takes screenshots in either JPG or PNG format, depending on a config file. You make a copy of those screenshots with the cp command. Naturally, even if you change the desired image format in the config file, the cp command will always create output files in the image format of the original files, regardless of the config file. The config file is only used by the screenshot application and not by cp. This is how the parquet.block.size property works as well.



          What you can do to change the block size is to rewrite the file. You mentioned that you have spark-shell. Use that to rewrite the Parquet file by issuing



          sc.hadoopConfiguration.setInt("parquet.block.size", 67108864)
          var df = spark.read.parquet("/path/to/input.parquet")
          df.write.parquet("/path/to/output")


          Update: Since you mentioned in the comments below that it does not work for you, I made an experiment and posting the session transcript below:



          $ spark-shell
          scala> sc.hadoopConfiguration.setInt("parquet.block.size", 200000)
          scala> var df = spark.read.parquet("/tmp/infile.parquet")
          df: org.apache.spark.sql.DataFrame = [field0000: binary, field0001: binary ... 78 more fields]
          scala> df.write.parquet("/tmp/200K")
          scala> df.write.format("parquet").mode("Overwrite").options(Map("parquet.block.size" -> "300000")).save("/tmp/300K")
          scala> :quit
          $ hadoop fs -copyToLocal /tmp/{200K,300K} /tmp
          $ parquet-tools meta /tmp/infile.parquet | grep "row group" | head -n 3
          row group 1: RC:4291 TS:5004800 OFFSET:4
          row group 2: RC:3854 TS:4499360 OFFSET:5004804
          row group 3: RC:4293 TS:5004640 OFFSET:10000000
          $ parquet-tools meta /tmp/200K/part-00000-* | grep "row group" | head -n 3
          row group 1: RC:169 TS:202080 OFFSET:4
          row group 2: RC:168 TS:201760 OFFSET:190164
          row group 3: RC:169 TS:203680 OFFSET:380324
          $ parquet-tools meta /tmp/300K/part-00000-* | grep "row group" | head -n 3
          row group 1: RC:254 TS:302720 OFFSET:4
          row group 2: RC:255 TS:303280 OFFSET:284004
          row group 3: RC:263 TS:303200 OFFSET:568884


          By looking at the TS values you can see that the input file had a row group size of 4.5-5M and the output files have row groups sizes of 200K and 300K, respectively. This shows that the value set using sc.hadoopConfiguration becomes the "default", while the other method you mentioned in a comment below involving df.options overrides this default.



          Update 2: Now that you have posted your output, I can see what is going on. In your case, compression is taking place, increasing the amount of data that will fit in row groups. The row group size applies to the compressed data, but TS shows the size of uncompressed data. However, you can deduce the size of row groups by subtracting their starting offsets. For example, the compressed size of your first row group is 59176084 - 4 = 59176080 bytes or less (since padding can take place as well). I copied your results into /tmp/rowgroups.dat on my computer and calculated your row group sizes by issuing the following command:



          $ cat /tmp/rowgroups.dat | sed 's/.*OFFSET://' | numinterval
          59176080
          60809783
          29408673
          64243499
          63585828
          54864199
          57684584
          38374804
          55453519


          (The numinterval command is in the num-utils package on Ubuntu.) As you can see, all of your row groups are smaller than the row group size you specified. (The reason why they are not exactly the specified size is PARQUET-1337.)







          share|improve this answer














          share|improve this answer



          share|improve this answer








          edited Nov 20 at 22:18

























          answered Nov 19 at 13:34









          Zoltan

          1,128313




          1,128313












          • do you have to set parquet.block.size in the hadoopConfiguration? Is it the same as doing df.write.option("parquet.block.size", "134217728").save("/path")?
            – user3685285
            Nov 19 at 19:01










          • It's the same. You can use either. The one specified in write.option overrides the one specified in hadoopConfiguration.
            – Zoltan
            Nov 19 at 19:27












          • Hm, i tried to set parquet.block.size in sc.hadoop.Configuration in a spark-shell, but when I used parquet-tools to check, it still resulted in much larger row groups than I had specified. Perhaps it has something to do with spark-shell?
            – user3685285
            Nov 19 at 20:30










          • It should be sc.hadoopConfiguration, not sc.hadoop.Configuration, but I guess it was just a typo in your comment as you would have gotten an error message if you actually used that in spark-shell. It's strange that it does not work. Have you tried the other method that you found (df.write.option) as well?
            – Zoltan
            Nov 19 at 20:40










          • This is how spark itself does it in their tests and benchmarks.
            – Zoltan
            Nov 19 at 20:47


















          • do you have to set parquet.block.size in the hadoopConfiguration? Is it the same as doing df.write.option("parquet.block.size", "134217728").save("/path")?
            – user3685285
            Nov 19 at 19:01










          • It's the same. You can use either. The one specified in write.option overrides the one specified in hadoopConfiguration.
            – Zoltan
            Nov 19 at 19:27












          • Hm, i tried to set parquet.block.size in sc.hadoop.Configuration in a spark-shell, but when I used parquet-tools to check, it still resulted in much larger row groups than I had specified. Perhaps it has something to do with spark-shell?
            – user3685285
            Nov 19 at 20:30










          • It should be sc.hadoopConfiguration, not sc.hadoop.Configuration, but I guess it was just a typo in your comment as you would have gotten an error message if you actually used that in spark-shell. It's strange that it does not work. Have you tried the other method that you found (df.write.option) as well?
            – Zoltan
            Nov 19 at 20:40










          • This is how spark itself does it in their tests and benchmarks.
            – Zoltan
            Nov 19 at 20:47
















          do you have to set parquet.block.size in the hadoopConfiguration? Is it the same as doing df.write.option("parquet.block.size", "134217728").save("/path")?
          – user3685285
          Nov 19 at 19:01




          do you have to set parquet.block.size in the hadoopConfiguration? Is it the same as doing df.write.option("parquet.block.size", "134217728").save("/path")?
          – user3685285
          Nov 19 at 19:01












          It's the same. You can use either. The one specified in write.option overrides the one specified in hadoopConfiguration.
          – Zoltan
          Nov 19 at 19:27






          It's the same. You can use either. The one specified in write.option overrides the one specified in hadoopConfiguration.
          – Zoltan
          Nov 19 at 19:27














          Hm, i tried to set parquet.block.size in sc.hadoop.Configuration in a spark-shell, but when I used parquet-tools to check, it still resulted in much larger row groups than I had specified. Perhaps it has something to do with spark-shell?
          – user3685285
          Nov 19 at 20:30




          Hm, i tried to set parquet.block.size in sc.hadoop.Configuration in a spark-shell, but when I used parquet-tools to check, it still resulted in much larger row groups than I had specified. Perhaps it has something to do with spark-shell?
          – user3685285
          Nov 19 at 20:30












          It should be sc.hadoopConfiguration, not sc.hadoop.Configuration, but I guess it was just a typo in your comment as you would have gotten an error message if you actually used that in spark-shell. It's strange that it does not work. Have you tried the other method that you found (df.write.option) as well?
          – Zoltan
          Nov 19 at 20:40




          It should be sc.hadoopConfiguration, not sc.hadoop.Configuration, but I guess it was just a typo in your comment as you would have gotten an error message if you actually used that in spark-shell. It's strange that it does not work. Have you tried the other method that you found (df.write.option) as well?
          – Zoltan
          Nov 19 at 20:40












          This is how spark itself does it in their tests and benchmarks.
          – Zoltan
          Nov 19 at 20:47




          This is how spark itself does it in their tests and benchmarks.
          – Zoltan
          Nov 19 at 20:47


















          draft saved

          draft discarded




















































          Thanks for contributing an answer to Stack Overflow!


          • Please be sure to answer the question. Provide details and share your research!

          But avoid



          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.


          To learn more, see our tips on writing great answers.





          Some of your past answers have not been well-received, and you're in danger of being blocked from answering.


          Please pay close attention to the following guidance:


          • Please be sure to answer the question. Provide details and share your research!

          But avoid



          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.


          To learn more, see our tips on writing great answers.




          draft saved


          draft discarded














          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53325155%2fhow-do-you-set-the-row-group-size-of-files-in-hdfs%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          Popular posts from this blog

          404 Error Contact Form 7 ajax form submitting

          How to know if a Active Directory user can login interactively

          TypeError: fit_transform() missing 1 required positional argument: 'X'