1. 程式人生 > >【原創】問題定位分享(21)spark執行insert overwrite非常慢,比hive還要慢

【原創】問題定位分享(21)spark執行insert overwrite非常慢,比hive還要慢

最近把一些sql執行從hive改到spark,發現執行更慢,sql主要是一些insert overwrite操作,從執行計劃看到,用到InsertIntoHiveTable

spark-sql> explain insert overwrite table test2 select * from test1;
== Physical Plan ==
InsertIntoHiveTable MetastoreRelation temp, test2, true, false
+- HiveTableScan [buyer_id#20, member_reg_gender#21, reg_birthday#22, reg_birthday1#23, age#24, age_range#25], MetastoreRelation temp, test1
Time taken: 0.404 seconds, Fetched 1 row(s)

跟進程式碼
org.apache.spark.sql.hive.execution.InsertIntoHiveTable

  protected override def doExecute(): RDD[InternalRow] = {
    sqlContext.sparkContext.parallelize(sideEffectResult.asInstanceOf[Seq[InternalRow]], 1)
  }

  /**
   * Inserts all the rows in the table into Hive.  Row objects are properly serialized with the
   * `org.apache.hadoop.hive.serde2.SerDe` and the
   * `org.apache.hadoop.mapred.OutputFormat` provided by the table definition.
   *
   * Note: this is run once and then kept to avoid double insertions.
   
*/ protected[sql] lazy val sideEffectResult: Seq[InternalRow] = { // Have to pass the TableDesc object to RDD.mapPartitions and then instantiate new serializer // instances within the closure, since Serializer is not serializable while TableDesc is. val tableDesc = table.tableDesc val tableLocation
= table.hiveQlTable.getDataLocation val tmpLocation = getExternalTmpPath(tableLocation) val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false) val isCompressed = hadoopConf.get("hive.exec.compress.output", "false").toBoolean if (isCompressed) { // Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec", // and "mapred.output.compression.type" have no impact on ORC because it uses table properties // to store compression information. hadoopConf.set("mapred.output.compress", "true") fileSinkConf.setCompressed(true) fileSinkConf.setCompressCodec(hadoopConf.get("mapred.output.compression.codec")) fileSinkConf.setCompressType(hadoopConf.get("mapred.output.compression.type")) } val numDynamicPartitions = partition.values.count(_.isEmpty) val numStaticPartitions = partition.values.count(_.nonEmpty) val partitionSpec = partition.map { case (key, Some(value)) => key -> value case (key, None) => key -> "" } // All partition column names in the format of "<column name 1>/<column name 2>/..." val partitionColumns = fileSinkConf.getTableInfo.getProperties.getProperty("partition_columns") val partitionColumnNames = Option(partitionColumns).map(_.split("/")).getOrElse(Array.empty) // By this time, the partition map must match the table's partition columns if (partitionColumnNames.toSet != partition.keySet) { throw new SparkException( s"""Requested partitioning does not match the ${table.tableName} table: |Requested partitions: ${partition.keys.mkString(",")} |Table partitions: ${table.partitionKeys.map(_.name).mkString(",")}""".stripMargin) } // Validate partition spec if there exist any dynamic partitions if (numDynamicPartitions > 0) { // Report error if dynamic partitioning is not enabled if (!hadoopConf.get("hive.exec.dynamic.partition", "true").toBoolean) { throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg) } // Report error if dynamic partition strict mode is on but no static partition is found if (numStaticPartitions == 0 && hadoopConf.get("hive.exec.dynamic.partition.mode", "strict").equalsIgnoreCase("strict")) { throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg) } // Report error if any static partition appears after a dynamic partition val isDynamic = partitionColumnNames.map(partitionSpec(_).isEmpty) if (isDynamic.init.zip(isDynamic.tail).contains((true, false))) { throw new AnalysisException(ErrorMsg.PARTITION_DYN_STA_ORDER.getMsg) } } val jobConf = new JobConf(hadoopConf) val jobConfSer = new SerializableJobConf(jobConf) // When speculation is on and output committer class name contains "Direct", we should warn // users that they may loss data if they are using a direct output committer. val speculationEnabled = sqlContext.sparkContext.conf.getBoolean("spark.speculation", false) val outputCommitterClass = jobConf.get("mapred.output.committer.class", "") if (speculationEnabled && outputCommitterClass.contains("Direct")) { val warningMessage = s"$outputCommitterClass may be an output committer that writes data directly to " + "the final location. Because speculation is enabled, this output committer may " + "cause data loss (see the case in SPARK-10063). If possible, please use an output " + "committer that does not have this behavior (e.g. FileOutputCommitter)." logWarning(warningMessage) } val writerContainer = if (numDynamicPartitions > 0) { val dynamicPartColNames = partitionColumnNames.takeRight(numDynamicPartitions) new SparkHiveDynamicPartitionWriterContainer( jobConf, fileSinkConf, dynamicPartColNames, child.output) } else { new SparkHiveWriterContainer( jobConf, fileSinkConf, child.output) } @transient val outputClass = writerContainer.newSerializer(table.tableDesc).getSerializedClass saveAsHiveFile(child.execute(), outputClass, fileSinkConf, jobConfSer, writerContainer) val outputPath = FileOutputFormat.getOutputPath(jobConf) // TODO: Correctly set holdDDLTime. // In most of the time, we should have holdDDLTime = false. // holdDDLTime will be true when TOK_HOLD_DDLTIME presents in the query as a hint. val holdDDLTime = false if (partition.nonEmpty) { if (numDynamicPartitions > 0) { externalCatalog.loadDynamicPartitions( db = table.catalogTable.database, table = table.catalogTable.identifier.table, outputPath.toString, partitionSpec, overwrite, numDynamicPartitions, holdDDLTime = holdDDLTime) } else { // scalastyle:off // ifNotExists is only valid with static partition, refer to // https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML#LanguageManualDML-InsertingdataintoHiveTablesfromqueries // scalastyle:on val oldPart = externalCatalog.getPartitionOption( table.catalogTable.database, table.catalogTable.identifier.table, partitionSpec) var doHiveOverwrite = overwrite if (oldPart.isEmpty || !ifNotExists) { // SPARK-18107: Insert overwrite runs much slower than hive-client. // Newer Hive largely improves insert overwrite performance. As Spark uses older Hive // version and we may not want to catch up new Hive version every time. We delete the // Hive partition first and then load data file into the Hive partition. if (oldPart.nonEmpty && overwrite) { oldPart.get.storage.locationUri.foreach { uri => val partitionPath = new Path(uri) val fs = partitionPath.getFileSystem(hadoopConf) if (fs.exists(partitionPath)) { if (!fs.delete(partitionPath, true)) { throw new RuntimeException( "Cannot remove partition directory '" + partitionPath.toString) } // Don't let Hive do overwrite operation since it is slower. doHiveOverwrite = false } } } // inheritTableSpecs is set to true. It should be set to false for an IMPORT query // which is currently considered as a Hive native command. val inheritTableSpecs = true externalCatalog.loadPartition( table.catalogTable.database, table.catalogTable.identifier.table, outputPath.toString, partitionSpec, isOverwrite = doHiveOverwrite, holdDDLTime = holdDDLTime, inheritTableSpecs = inheritTableSpecs) } } } else { externalCatalog.loadTable( table.catalogTable.database, table.catalogTable.identifier.table, outputPath.toString, // TODO: URI overwrite, holdDDLTime) } // Attempt to delete the staging directory and the inclusive files. If failed, the files are // expected to be dropped at the normal termination of VM since deleteOnExit is used. try { createdTempDir.foreach { path => path.getFileSystem(hadoopConf).delete(path, true) } } catch { case NonFatal(e) => logWarning(s"Unable to delete staging directory: $stagingDir.\n" + e) } // un-cache this table. sqlContext.sparkSession.catalog.uncacheTable(table.catalogTable.identifier.quotedString) sqlContext.sessionState.catalog.refreshTable(table.catalogTable.identifier) // It would be nice to just return the childRdd unchanged so insert operations could be chained, // however for now we return an empty list to simplify compatibility checks with hive, which // does not return anything for insert operations. // TODO: implement hive compatibility as rules. Seq.empty[InternalRow] }

insert overwrite 執行分為三步,一個是select,一個是write,一個是load,前邊兩步沒什麼問題,主要是最後一步load,以loadPartition為例看下執行過程:

org.apache.spark.sql.hive.HiveExternalCatalog

  override def loadPartition(
      db: String,
      table: String,
      loadPath: String,
      partition: TablePartitionSpec,
      isOverwrite: Boolean,
      holdDDLTime: Boolean,
      inheritTableSpecs: Boolean): Unit = withClient {
    requireTableExists(db, table)

    val orderedPartitionSpec = new util.LinkedHashMap[String, String]()
    getTable(db, table).partitionColumnNames.foreach { colName =>
      // Hive metastore is not case preserving and keeps partition columns with lower cased names,
      // and Hive will validate the column names in partition spec to make sure they are partition
      // columns. Here we Lowercase the column names before passing the partition spec to Hive
      // client, to satisfy Hive.
      orderedPartitionSpec.put(colName.toLowerCase, partition(colName))
    }

    client.loadPartition(
      loadPath,
      db,
      table,
      orderedPartitionSpec,
      isOverwrite,
      holdDDLTime,
      inheritTableSpecs)
  }

這裡會呼叫HiveClientImpl.loadPartition

org.apache.spark.sql.hive.client.HiveClientImpl

  def loadPartition(
      loadPath: String,
      dbName: String,
      tableName: String,
      partSpec: java.util.LinkedHashMap[String, String],
      replace: Boolean,
      holdDDLTime: Boolean,
      inheritTableSpecs: Boolean): Unit = withHiveState {
    val hiveTable = client.getTable(dbName, tableName, true /* throw exception */)
    shim.loadPartition(
      client,
      new Path(loadPath), // TODO: Use URI
      s"$dbName.$tableName",
      partSpec,
      replace,
      holdDDLTime,
      inheritTableSpecs,
      isSkewedStoreAsSubdir = hiveTable.isStoredAsSubDirectories)
  }

這裡會呼叫Shim_v0_12.loadPartition

org.apache.spark.sql.hive.client.Shim_v0_12

  override def loadPartition(
      hive: Hive,
      loadPath: Path,
      tableName: String,
      partSpec: JMap[String, String],
      replace: Boolean,
      holdDDLTime: Boolean,
      inheritTableSpecs: Boolean,
      isSkewedStoreAsSubdir: Boolean): Unit = {
    loadPartitionMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean,
      holdDDLTime: JBoolean, inheritTableSpecs: JBoolean, isSkewedStoreAsSubdir: JBoolean)
  }

  private lazy val loadPartitionMethod =
    findMethod(
      classOf[Hive],
      "loadPartition",
      classOf[Path],
      classOf[String],
      classOf[JMap[String, String]],
      JBoolean.TYPE,
      JBoolean.TYPE,
      JBoolean.TYPE,
      JBoolean.TYPE)

這裡會反射呼叫hive的類Hive.loadPartition

org.apache.hadoop.hive.ql.metadata.Hive (1.2版本)

    public void loadPartition(Path loadPath, String tableName, Map<String, String> partSpec, boolean replace, boolean holdDDLTime, boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir, boolean isSrcLocal, boolean isAcid) throws HiveException {
        Table tbl = this.getTable(tableName);
        this.loadPartition(loadPath, tbl, partSpec, replace, holdDDLTime, inheritTableSpecs, isSkewedStoreAsSubdir, isSrcLocal, isAcid);
    }

    public Partition loadPartition(Path loadPath, Table tbl, Map<String, String> partSpec, boolean replace, boolean holdDDLTime, boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir, boolean isSrcLocal, boolean isAcid) throws HiveException {
        Path tblDataLocationPath = tbl.getDataLocation();
        Partition newTPart = null;

        try {
            Partition oldPart = this.getPartition(tbl, partSpec, false);
            Path oldPartPath = null;
            if (oldPart != null) {
                oldPartPath = oldPart.getDataLocation();
            }

            Path newPartPath = null;
            FileSystem oldPartPathFS;
            if (inheritTableSpecs) {
                Path partPath = new Path(tbl.getDataLocation(), Warehouse.makePartPath(partSpec));
                newPartPath = new Path(tblDataLocationPath.toUri().getScheme(), tblDataLocationPath.toUri().getAuthority(), partPath.toUri().getPath());
                if (oldPart != null) {
                    oldPartPathFS = oldPartPath.getFileSystem(this.getConf());
                    FileSystem loadPathFS = loadPath.getFileSystem(this.getConf());
                    if (FileUtils.equalsFileSystem(oldPartPathFS, loadPathFS)) {
                        newPartPath = oldPartPath;
                    }
                }
            } else {
                newPartPath = oldPartPath;
            }

            List<Path> newFiles = null;
            if (replace) {
                replaceFiles(tbl.getPath(), loadPath, newPartPath, oldPartPath, this.getConf(), isSrcLocal);
            } else {
                newFiles = new ArrayList();
                oldPartPathFS = tbl.getDataLocation().getFileSystem(this.conf);
                copyFiles(this.conf, loadPath, newPartPath, oldPartPathFS, isSrcLocal, isAcid, newFiles);
            }

            boolean forceCreate = !holdDDLTime;
            newTPart = this.getPartition(tbl, partSpec, forceCreate, newPartPath.toString(), inheritTableSpecs, newFiles);
            if (!holdDDLTime && isSkewedStoreAsSubdir) {
                org.apache.hadoop.hive.metastore.api.Partition newCreatedTpart = newTPart.getTPartition();
                SkewedInfo skewedInfo = newCreatedTpart.getSd().getSkewedInfo();
                Map<List<String>, String> skewedColValueLocationMaps = this.constructListBucketingLocationMap(newPartPath, skewedInfo);
                skewedInfo.setSkewedColValueLocationMaps(skewedColValueLocationMaps);
                newCreatedTpart.getSd().setSkewedInfo(skewedInfo);
                this.alterPartition(tbl.getDbName(), tbl.getTableName(), new Partition(tbl, newCreatedTpart));
                this.getPartition(tbl, partSpec, true, newPartPath.toString(), inheritTableSpecs, newFiles);
                return new Partition(tbl, newCreatedTpart);
            } else {
                return newTPart;
            }
        } catch (IOException var20) {
            LOG.error(StringUtils.stringifyException(var20));
            throw new HiveException(var20);
        } catch (MetaException var21) {
            LOG.error(StringUtils.stringifyException(var21));
            throw new HiveException(var21);
        } catch (InvalidOperationException var22) {
            LOG.error(StringUtils.stringifyException(var22));
            throw new HiveException(var22);
        }
    }

    protected static void replaceFiles(Path tablePath, Path srcf, Path destf, Path oldPath, HiveConf conf, boolean isSrcLocal) throws HiveException {
        try {
            FileSystem destFs = destf.getFileSystem(conf);
            boolean inheritPerms = HiveConf.getBoolVar(conf, ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS);

            FileSystem srcFs;
            FileStatus[] srcs;
            try {
                srcFs = srcf.getFileSystem(conf);
                srcs = srcFs.globStatus(srcf);
            } catch (IOException var20) {
                throw new HiveException("Getting globStatus " + srcf.toString(), var20);
            }

            if (srcs == null) {
                LOG.info("No sources specified to move: " + srcf);
            } else {
                List<List<Path[]>> result = checkPaths(conf, destFs, srcs, srcFs, destf, true);
                if (oldPath != null) {
                    try {
                        FileSystem fs2 = oldPath.getFileSystem(conf);
                        if (fs2.exists(oldPath)) {
                            if (FileUtils.isSubDir(oldPath, destf, fs2)) {
                                FileUtils.trashFilesUnderDir(fs2, oldPath, conf);
                            }

                            if (inheritPerms) {
                                inheritFromTable(tablePath, destf, conf, destFs);
                            }
                        }
                    } catch (Exception var19) {
                        LOG.warn("Directory " + oldPath.toString() + " cannot be removed: " + var19, var19);
                    }
                }

                if (srcs.length == 1 && srcs[0].isDir()) {
                    Path destfp = destf.getParent();
                    if (!destFs.exists(destfp)) {
                        boolean success = destFs.mkdirs(destfp);
                        if (!success) {
                            LOG.warn("Error creating directory " + destf.toString());
                        }

                        if (inheritPerms && success) {
                            inheritFromTable(tablePath, destfp, conf, destFs);
                        }
                    }

                    Iterator i$ = result.iterator();

                    while(i$.hasNext()) {
                        List<Path[]> sdpairs = (List)i$.next();
                        Iterator i$ = sdpairs.iterator();

                        while(i$.hasNext()) {
                            Path[] sdpair = (Path[])i$.next();
                            Path destParent = sdpair[1].getParent();
                            FileSystem destParentFs = destParent.getFileSystem(conf);
                            if (!destParentFs.isDirectory(destParent)) {
                                boolean success = destFs.mkdirs(destParent);
                                if (!success) {
                                    LOG.warn("Error creating directory " + destParent);
                                }

                                if (inheritPerms && success) {
                                    inheritFromTable(tablePath, destParent, conf, destFs);
                                }
                            }

                            if (!moveFile(conf, sdpair[0], sdpair[1], destFs, true, isSrcLocal)) {
                                throw new IOException("Unable to move file/directory from " + sdpair[0] + " to " + sdpair[1]);
                            }
                        }
                    }
                } else {
                    if (!destFs.exists(destf)) {
                        boolean success = destFs.mkdirs(destf);
                        if (!success) {
                            LOG.warn("Error creating directory " + destf.toString());
                        }

                        if (inheritPerms && success) {
                            inheritFromTable(tablePath, destf, conf, destFs);
                        }
                    }

                    Iterator i$ = result.iterator();

                    while(i$.hasNext()) {
                        List<Path[]> sdpairs = (List)i$.next();
                        Iterator i$ = sdpairs.iterator();

                        while(i$.hasNext()) {
                            Path[] sdpair = (Path[])i$.next();
                            if (!moveFile(conf, sdpair[0], sdpair[1], destFs, true, isSrcLocal)) {
                                throw new IOException("Error moving: " + sdpair[0] + " into: " + sdpair[1]);
                            }
                        }
                    }
                }

            }
        } catch (IOException var21) {
            throw new HiveException(var21.getMessage(), var21);
        }
    }

    public static boolean trashFilesUnderDir(FileSystem fs, Path f, Configuration conf) throws FileNotFoundException, IOException {
        FileStatus[] statuses = fs.listStatus(f, HIDDEN_FILES_PATH_FILTER);
        boolean result = true;
        FileStatus[] arr$ = statuses;
        int len$ = statuses.length;

        for(int i$ = 0; i$ < len$; ++i$) {
            FileStatus status = arr$[i$];
            result &= moveToTrash(fs, status.getPath(), conf);
        }

        return result;
    }

hive在執行loadPartition的時候,如果分割槽目錄已經存在,會呼叫replaceFiles,replaceFiles會呼叫trashFilesUnderDir,trashFilesUnderDir裡會逐個將檔案放到回收站;

spark執行loadPartition的時候,直接反射呼叫hive的邏輯,為什麼還會比hive執行慢很多呢?

這時注意到hive用的版本是2.1,spark2.1.1裡依賴的hive版本是1.2,對比hive1.2和hive2.1之間的程式碼發現,確實有差別,以下是hive2.1的程式碼:

org.apache.hadoop.hive.ql.metadata.Hive(2.1版本)

  /**
   * Trashes or deletes all files under a directory. Leaves the directory as is.
   * @param fs FileSystem to use
   * @param f path of directory
   * @param conf hive configuration
   * @param forceDelete whether to force delete files if trashing does not succeed
   * @return true if deletion successful
   * @throws IOException
   */
  private boolean trashFilesUnderDir(final FileSystem fs, Path f, final Configuration conf)
      throws IOException {
    FileStatus[] statuses = fs.listStatus(f, FileUtils.HIDDEN_FILES_PATH_FILTER);
    boolean result = true;
    final List<Future<Boolean>> futures = new LinkedList<>();
    final ExecutorService pool = conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25) > 0 ?
        Executors.newFixedThreadPool(conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25),
        new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Delete-Thread-%d").build()) : null;
    final SessionState parentSession = SessionState.get();
    for (final FileStatus status : statuses) {
      if (null == pool) {
        result &= FileUtils.moveToTrash(fs, status.getPath(), conf);
      } else {
        futures.add(pool.submit(new Callable<Boolean>() {
          @Override
          public Boolean call() throws Exception {
            SessionState.setCurrentSessionState(parentSession);
            return FileUtils.moveToTrash(fs, status.getPath(), conf);
          }
        }));
      }
    }
    if (null != pool) {
      pool.shutdown();
      for (Future<Boolean> future : futures) {
        try {
          result &= future.get();
        } catch (InterruptedException | ExecutionException e) {
          LOG.error("Failed to delete: ",e);
          pool.shutdownNow();
          throw new IOException(e);
        }
      }
    }
    return result;
  }

可以看到在hive2.1裡刪除檔案用到了執行緒池,而在hive1.2裡是在for迴圈裡序列刪除,所以當檔案很多時,hive2.1比hive1.2(即spark2.1.1)就會快非常多;

spark依賴hive的方式是直接反射呼叫,由於hive1.2和hive2.1很多類的方法介面都有調整,很難升級,所以遇到這個問題只能通過修改spark裡Hive.trashFilesUnderDir程式碼,同樣改為執行緒池的方式來刪除檔案,問題解決;