我们的数据一天就一个T,数据量不断增大,集群磁盘有限,所以把冷数据放到了oss,偶尔会使用到冷数据,如果使用的时候还的从oss上拉数据这样很浪费时间后来想了个办法可以直接获取到oss上的数据。 案例: object OssWc { def main(args: Array[String]): Unit = { if (args.length < 3) { System.err.println( """ |Arguments: | inputPath Input OSS object path, like oss://accessKeyId:accessKeySecret@bucket.endpoint/input/words.txt | outputPath Output OSS object path, like oss://accessKeyId:accessKeySecret@bucket.endpoint/output | numPartitions The number of RDD partitions. | """.stripMargin) System.exit(1) } Logger.getLogger("org").setLevel(Level.WARN) val conf = new SparkConf().setAppName("OssWc")//.setMaster("local[4]") conf.set("spark.hadoop.fs.oss.impl", "com.aliyun.fs.oss.nat.NativeOssFileSystem") conf.set("spark.hadoop.mapreduce.job.run-local", "true") conf.set("spark.hadoop.fs.oss.accessKeyId", "SLFNEWKBG") conf.set("spark.hadoop.fs.oss.accessKeySecret", "SDFSFSS") val sc = new SparkContext(conf) val inputPath = args(0) val outputPath = args(1) val numPartitions = 1 val input = sc.textFile(inputPath,numPartitions) val output = input.flatMap(_.split(",")).map(x=>(x,1)).reduceByKey(_+_).foreach(println(_)) output.saveAsTextFile(outputPath) sc.stop() } } 本地测试时注意下 //.setMaster("local[4]") 不然会报以下错:
2018-01-25 12:04:38 [ main:1 ] - [ ERROR ] org.apache.spark.internal.Logging$class.logError(Logging.scala:91) Error initializing SparkContext.
org.apache.spark.SparkException: A master URL must be set in your configuration at org.apache.spark.SparkContext.<init>(SparkContext.scala:379) at text.OssWc$.main(OssWc.scala:32) at text.OssWc.main(OssWc.scala)Exception in thread "main" org.apache.spark.SparkException: A master URL must be set in your configuration at org.apache.spark.SparkContext.<init>(SparkContext.scala:379) at text.OssWc$.main(OssWc.scala:32) at text.OssWc.main(OssWc.scala)