如何使用Hadoop读写数据库

如何使用Hadoop读写数据库,第1张

你的意思是需要hadoop读取关系型数据库的数据吧,一般是安装sqoop组件,开源社区也是有一个组件,现在已经到sqoop2了,使用sqoop import或者sqoop export来和关系型数据库互相导数据

Hadoop的Mapper是怎么从HDFS上读取TextInputFormat数据的

Hadoop中控制文件格式,split方式和record读取方式的类都继承自InputFormat这个抽象类。比如实现每次读取文本文件一行的就是TextInputFormat,这个类进一步使用LineRecordReader进行实际的读取 *** 作。以Hadoop 1.0.1为例,在LineRecordReader第97-99行:

newSize = in.readLine(value, maxLineLength,

Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),

maxLineLength))

从文本行读取类LineReader in中读取一行写入record的value中。为了一次读取两行,可以将96-106行的while循环再复制粘贴一份在下面。

但是LineReader的readLine函数执行时会首先将value原来的值清空,但是我们读取第二行时不想将第一行的内容清空。因此对LineReader的readLine函数做一点修改:

为了保留原来的readLine函数,我们首先讲这个函数复制粘贴一份在下面,将readLine的函数声明做一点修改,增加是否clear value的判断:

public int readLine(Text str, int maxLineLength,

int maxBytesToConsume, boolean clear) throws IOException {

然后讲123行的str.clear()修改为if (clear) {str.clear()}

这样,在LineRecordReader的两个while循环中,第一次readLine应为:

newSize = in.readLine(value, maxLineLength,

Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),

maxLineLength), true)

光驱是怎么读取数据的?

你好!很高兴解答你的提问!请你阅读如下提示,排除烦恼。谢谢!

1,光盘放进光盘,夹盘器夹紧光盘,主轴电机带着旋转到一定转数,司服电机推激光头车到近盘心。

2,开激光单元发射激光束照射到光盘反光区最近心圈,探测光盘类型与寻地方式,根据反射回的光

3,束的抖动变化,转换为二进制数据,由译码器变成光驱能读的数据,传递给电脑中央处理器处理。

4,光驱解读出光盘内侧的光盘核心引导数据,确定光盘类型与播放模式,数据头与尾,文件系统。

5,数据排列结构,确定读取光盘形式。从光盘读取到的数据可以是资料,文件,图片,音乐,影片,

6,命令。光盘的微观结构是螺旋形的,象夏天点的蚊香,密密麻麻排列着肉眼看不见的光轨存数据。

7,顺序读取光盘数据时,激光头沿光盘弦线从内到外作直线运动,完成从内到外读取螺旋形光轨。

8,正好跟蚊香燃烧的顺序相反。最重要的核心数据都在光盘近心圈,所以刮伤近心圈,整盘读不出。

9,就是这个原因,因为引导如何读取光盘的数据都在近心圈,外圈基本存储普通数据,损失小点。

10,如果跳来跳去读光盘数据,就会参照光盘近心圈的导引数据,寻址到真实数据存在的光轨位置。

11,然后读取到所需数据,这样激光头小车就会移来移去发出响声。光盘从内圈到外圈的周长不同,

12,中心圈最短,最外圈最长。每一圈可容纳的数据也是如此,近中心最少,最外圈最多,这样会

13,导致光驱读取到数据的速度会变化,如果为了保持读取数据速度的恒定,就得调节主轴电机的

14,转数,读内圈时要转得快,激光头小车也要移得快,外圈要转得慢,激光头小车也要移得慢。

15,所以我们会听到光盘转动的风噪会有变化,尤其是跳来跳去读存在光盘不同区域的数据时,

16,风噪都会听出区别。当光盘有刮伤时,由于有数据损坏缺失,导致光驱读不下去,于是就会

17,跳回来继续读取,实在读不出原数据,就用CRC32数据冗余机制,加速读取上一段与下一段

18,区域的数据,进行纠错或预测判断来填补缺失的数据,于是主轴电机转速更高,反复读取的

19,风噪就更明显,时快时慢,就是光驱试图通过调节速度来读出数据。光盘记录数据宏观来说

20,是光轨,微观来说,每一位数据都是以物理形式记录的。通常激光在平整的光盘染料面烧蚀

21,出一小坑,代表数据1,而没被烧出坑来的地方就是一小个平面,代表数据0,激光从一个固定

22,的角度入射进坑,与入射到平台,激光的反射与折射的角度与轨迹会发生明显不同。最后回

23,到激光头后,会被折射到光敏元件上,由光信号转换为电脉冲信号,传递给光驱里的译码器。

24,译码器再转换为电脑能识别的资料或命令。光轨就是这样由这样密密麻麻排列的凹坑与平台

25,组成。众多的光轨再排列组成光盘。电脑是处理二进制数据的机器,磁盘,U盘无一不是二进

26,制存储数据的。硬盘是利用磁性原理,有磁无磁代表有无数据。磁极排列走向决定数据0或1。

27,内存利用电容器的电位高低来判断数据0或1,有电无电来判断有无数据。数据总线是以脉冲

28,波的波峰波谷来判断数据0或1,没脉冲波没数据,有脉冲波有数据。闪存也是利用存储单元的

29,电位高低来判断数据0或1,有电势有数据,没电势没数据。人脑的高级指令都得译成机器语言。

30,我们以10进制为计算单位,电脑能接受的数据都是二进制的,所以编程员的工作很辛苦且枯燥。

stm32 是怎么读取串口数据的

串口接收中断,接收数据,再判断命令,根据命令的不同来确定下面的 *** 作

*** 作系统是怎么读取硬盘数据的?

首先CPU向内存要数据,内存得到指令后向硬盘要数据,硬盘把数据交给内存,内存再交给CPU处理。

plc的主站、从站是怎么从DIO、AIO模块读取或发送数据的?

以西门子为例:

PPI通信直接将主从的IO或V区进行通信,可以将AIO的数据放在V存储区中做通信。 例如:将主站IB0数据直接传送到从站QB0。或者主站VB0数据传送到从站VB0当中,做数据交流。

MODBUS通信直接通信V存储区。

读取尺子上的数据的读取用日语怎么说

読み出す 日【よみだす】

[动]读出,读取

等于 read out

1〔読み始める〕begin to read

2〔コンピュータ内の记忆データを取り出す〕retrieve ((data))

hadoop mapper类 切割数据的时候怎么根据回车切割

DataTable dt = new DataTable()

dt.Columns.Add(new DataColumn("PreRevDate0", typeof(decimal)))

DataColumn col = new DataColumn()

col.ColumnName = "PreRevDate1"

col.Expression = "ABS(Convert.ToInt32(PreRevDate0))"

col.DataType = typeof(decimal)

dt.Columns.Add(col)

DataRow dr = dt.NewRow()

dr["PreRevDate0"] = -1

dt.Rows.Add(dr)

hadoop默认是读取文件的数据的单位是一行,怎么修改能使得hadoop以两行为单位进行读取数据

Hadoop中控制文件格式,split方式和record读取方式的类都继承自InputFormat这个抽象类。比如实现每次读取文本文件一行的就是TextInputFormat,这个类进一步使用LineRecordReader进行实际的读取 *** 作。以Hadoop 1.0.1为例,在LineRecordReader第97-99行:

newSize = in.readLine(value, maxLineLength,

Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),

maxLineLength))

从文本行读取类LineReader in中读取一行写入record的value中。为了一次读取两行,可以将96-106行的while循环再复制粘贴一份在下面。

但是LineReader的readLine函数执行时会首先将value原来的值清空,但是我们读取第二行时不想将第一行的内容清空。因此对LineReader的readLine函数做一点修改:

为了保留原来的readLine函数,我们首先讲这个函数复制粘贴一份在下面,将readLine的函数声明做一点修改,增加是否clear value的判断:

public int readLine(Text str, int maxLineLength,

int maxBytesToConsume, boolean clear) throws IOException {

然后讲123行的str.clear()修改为if (clear) {str.clear()}

这样,在LineRecordReader的两个while循环中,第一次readLine应为:

newSize = in.readLine(value, maxLineLength,

Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),

maxLineLength), true)

第二次readLine应为:

newSize = in.readLine(value, maxLineLength,

Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),

maxLineLength), false)

搞定。

cpu是怎么读取内存数据

在计算机中CPU是通过数据总线与内存交换数据的.CPU与内存交换数据是通过前端总线完成的,前端总线也是数据总线的一种。

这个flash是怎么读取的数据?

先看一下效果嘛

客户端通过调用FileSystem对象(对应于HDFS文件系统,调用DistributedFileSystem对象)的open()方法来打开文件(也即图中的第一步),DistributedFileSystem通过RPC(Remote Procedure Call)调用询问NameNode来得到此文件最开始几个block的文件位置(第二步)。对每一个block来说,namenode返回拥有此block备份的所有namenode的地址信息(按集群的拓扑网络中与客户端距离的远近排序,关于在Hadoop集群中如何进行网络拓扑请看下面介绍)。如果客户端本身就是一个datanode(如客户端是一个mapreduce任务)并且此datanode本身就有所需文件block的话,客户端便从本地读取文件。

以上步骤完成后,DistributedFileSystem会返回一个FSDataInputStream(支持文件seek),客户端可以从FSDataInputStream中读取数据。FSDataInputStream包装了一个DFSInputSteam类,用来处理namenode和datanode的I/O *** 作。

客户端然后执行read()方法(第三步),DFSInputStream(已经存储了欲读取文件的开始几个block的位置信息)连接到第一个datanode(也即最近的datanode)来获取数据。通过重复调用read()方法(第四、第五步),文件内的数据就被流式的送到了客户端。当读到该block的末尾时,DFSInputStream就会关闭指向该block的流,转而找到下一个block的位置信息然后重复调用read()方法继续对该block的流式读取。这些过程对于用户来说都是透明的,在用户看来这就是不间断的流式读取整个文件。

当真个文件读取完毕时,客户端调用FSDataInputSteam中的close()方法关闭文件输入流(第六步)。

如果在读某个block是DFSInputStream检测到错误,DFSInputSteam就会连接下一个datanode以获取此block的其他备份,同时他会记录下以前检测到的坏掉的datanode以免以后再无用的重复读取该datanode。DFSInputSteam也会检查从datanode读取来的数据的校验和,如果发现有数据损坏,它会把坏掉的block报告给namenode同时重新读取其他datanode上的其他block备份。

这种设计模式的一个好处是,文件读取是遍布这个集群的datanode的,namenode只是提供文件block的位置信息,这些信息所需的带宽是很少的,这样便有效的避免了单点瓶颈问题从而可以更大的扩充集群的规模。

Hadoop中的网络拓扑

在Hadoop集群中如何衡量两个节点的远近呢?要知道,在高速处理数据时,数据处理速率的唯一限制因素就是数据在不同节点间的传输速度:这是由带宽的可怕匮乏引起的。所以我们把带宽作为衡量两个节点距离大小的标准。

但是计算两个节点之间的带宽是比较复杂的,而且它需要在一个静态的集群下才能衡量,但Hadoop集群一般是随着数据处理的规模动态变化的(且两两节点直接相连的连接数是节点数的平方)。于是Hadoop使用了一个简单的方法来衡量距离,它把集群内的网络表示成一个树结构,两个节点之间的距离就是他们离共同祖先节点的距离之和。树一般按数据中心(datacenter),机架(rack),计算节点(datanode)的结构组织。计算节点上的本地运算速度最快,跨数据中心的计算速度最慢(现在跨数据中心的Hadoop集群用的还很少,一般都是在一个数据中心内做运算的)。

假如有个计算节点n1处在数据中心c1的机架r1上,它可以表示为/c1/r1/n1,下面是不同情况下两个节点的距离:

• distance(/d1/r1/n1, /d1/r1/n1) = 0 (processes on the same node)

• distance(/d1/r1/n1, /d1/r1/n2) = 2 (different nodes on the same rack)

• distance(/d1/r1/n1, /d1/r2/n3) = 4 (nodes on different racks in the same data center)

• distance(/d1/r1/n1, /d2/r3/n4) = 6 (nodes in different data centers)

如下图所示:

Hadoop

写文件

现在我们来看一下Hadoop中的写文件机制解析,通过写文件机制我们可以更好的了解一下Hadoop中的一致性模型。

Hadoop

上图为我们展示了一个创建一个新文件并向其中写数据的例子。

首先客户端通过DistributedFileSystem上的create()方法指明一个欲创建的文件的文件名(第一步),DistributedFileSystem再通过RPC调用向NameNode申请创建一个新文件(第二步,这时该文件还没有分配相应的block)。namenode检查是否有同名文件存在以及用户是否有相应的创建权限,如果检查通过,namenode会为该文件创建一个新的记录,否则的话文件创建失败,客户端得到一个IOException异常。DistributedFileSystem返回一个FSDataOutputStream以供客户端写入数据,与FSDataInputStream类似,FSDataOutputStream封装了一个DFSOutputStream用于处理namenode与datanode之间的通信。

当客户端开始写数据时(第三步),DFSOutputStream把写入的数据分成包(packet), 放入一个中间队列——数据队列(data queue)中去。DataStreamer从数据队列中取数据,同时向namenode申请一个新的block来存放它已经取得的数据。namenode选择一系列合适的datanode(个数由文件的replica数决定)构成一个管道线(pipeline),这里我们假设replica为3,所以管道线中就有三个datanode。DataSteamer把数据流式的写入到管道线中的第一个datanode中(第四步),第一个datanode再把接收到的数据转到第二个datanode中(第四步),以此类推。

DFSOutputStream同时也维护着另一个中间队列——确认队列(ack queue),确认队列中的包只有在得到管道线中所有的datanode的确认以后才会被移出确认队列(第五步)。

如果某个datanode在写数据的时候当掉了,下面这些对用户透明的步骤会被执行:

1)管道线关闭,所有确认队列上的数据会被挪到数据队列的首部重新发送,这样可以确保管道线中当掉的datanode下流的datanode不会因为当掉的datanode而丢失数据包。

2)在还在正常运行的datanode上的当前block上做一个标志,这样当当掉的datanode重新启动以后namenode就会知道该datanode上哪个block是刚才当机时残留下的局部损坏block,从而可以把它删掉。

3)已经当掉的datanode从管道线中被移除,未写完的block的其他数据继续被写入到其他两个还在正常运行的datanode中去,namenode知道这个block还处在under-replicated状态(也即备份数不足的状态)下,然后他会安排一个新的replica从而达到要求的备份数,后续的block写入方法同前面正常时候一样。

有可能管道线中的多个datanode当掉(虽然不太经常发生),但只要dfs.replication.min(默认为1)个replica被创建,我们就认为该创建成功了。剩余的replica会在以后异步创建以达到指定的replica数。

当客户端完成写数据后,它会调用close()方法(第六步)。这个 *** 作会冲洗(flush)所有剩下的package到pipeline中,等待这些package确认成功,然后通知namenode写入文件成功(第七步)。这时候namenode就知道该文件由哪些block组成(因为DataStreamer向namenode请求分配新block,namenode当然会知道它分配过哪些blcok给给定文件),它会等待最少的replica数被创建,然后成功返回。

replica是如何分布的

Hadoop在创建新文件时是如何选择block的位置的呢,综合来说,要考虑以下因素:带宽(包括写带宽和读带宽)和数据安全性。如果我们把三个备份全部放在一个datanode上,虽然可以避免了写带宽的消耗,但几乎没有提供数据冗余带来的安全性,因为如果这个datanode当机,那么这个文件的所有数据就全部丢失了。另一个极端情况是,如果把三个冗余备份全部放在不同的机架,甚至数据中心里面,虽然这样数据会安全,但写数据会消耗很多的带宽。Hadoop 0.17.0给我们提供了一个默认replica分配策略(Hadoop 1.X以后允许replica策略是可插拔的,也就是你可以自己制定自己需要的replica分配策略)。replica的默认分配策略是把第一个备份放在与客户端相同的datanode上(如果客户端在集群外运行,就随机选取一个datanode来存放第一个replica),第二个replica放在与第一个replica不同机架的一个随机datanode上,第三个replica放在与第二个replica相同机架的随机datanode上。如果replica数大于三,则随后的replica在集群中随机存放,Hadoop会尽量避免过多的replica存放在同一个机架上。选取replica的放置位置后,管道线的网络拓扑结构如下所示:

Hadoop

总体来说,上述默认的replica分配策略给了我们很好的可用性(blocks放置在两个rack上,较为安全),写带宽优化(写数据只需要跨越一个rack),读带宽优化(你可以从两个机架中选择较近的一个读取)。

一致性模型

HDFS某些地方为了性能可能会不符合POSIX(是的,你没有看错,POSIX不仅仅只适用于linux/unix, Hadoop 使用了POSIX的设计来实现对文件系统文件流的读取 ),所以它看起来可能与你所期望的不同,要注意。

创建了一个文件以后,它是可以在命名空间(namespace)中可以看到的:

Path p = new Path("p")

fs.create(p)

assertThat(fs.exists(p), is(true))

但是任何向此文件中写入的数据并不能保证是可见的,即使你flush了已经写入的数据,此文件的长度可能仍然为零:

Path p = new Path("p")

OutputStream out = fs.create(p)

out.write("content".getBytes("UTF-8"))

out.flush()

assertThat(fs.getFileStatus(p).getLen(), is(0L))

这是因为,在Hadoop中,只有满一个block数据量的数据被写入文件后,此文件中的内容才是可见的(即这些数据会被写入到硬盘中去),所以当前正在写的block中的内容总是不可见的。

Hadoop提供了一种强制使buffer中的内容冲洗到datanode的方法,那就是FSDataOutputStream的sync()方法。调用了sync()方法后,Hadoop保证所有已经被写入的数据都被冲洗到了管道线中的datanode中,并且对所有读者都可见了:

Path p = new Path("p")

FSDataOutputStream out = fs.create(p)

out.write("content".getBytes("UTF-8"))

out.flush()

out.sync()

assertThat(fs.getFileStatus(p).getLen(), is(((long) "content".length())))

这个方法就像POSIX中的fsync系统调用(它冲洗给定文件描述符中的所有缓冲数据到磁盘中)。例如,使用java API写一个本地文件,我们可以保证在调用flush()和同步化后可以看到已写入的内容:

FileOutputStream out = new FileOutputStream(localFile)

out.write("content".getBytes("UTF-8"))

out.flush()// flush to operating system

out.getFD().sync()// sync to disk (getFD()返回与该流所对应的文件描述符)

assertThat(localFile.length(), is(((long) "content".length())))

在HDFS中关闭一个流隐式的调用了sync()方法:

Path p = new Path("p")

OutputStream out = fs.create(p)

out.write("content".getBytes("UTF-8"))

out.close()

assertThat(fs.getFileStatus(p).getLen(), is(((long) "content".length())))

由于Hadoop中的一致性模型限制,如果我们不调用sync()方法的话,我们很可能会丢失多大一个block的数据。这是难以接受的,所以我们应该使用sync()方法来确保数据已经写入磁盘。但频繁调用sync()方法也是不好的,因为会造成很多额外开销。我们可以再写入一定量数据后调用sync()方法一次,至于这个具体的数据量大小就要根据你的应用程序而定了,在不影响你的应用程序的性能的情况下,这个数据量应越大越好。


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/sjk/9975592.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-05-03
下一篇 2023-05-03

发表评论

登录后才能评论

评论列表(0条)

保存