Error[8]: Undefined offset: 783, File: /www/wwwroot/outofmemory.cn/tmp/plugin_ss_superseo_model_superseo.php, Line: 121
File: /www/wwwroot/outofmemory.cn/tmp/plugin_ss_superseo_model_superseo.php, Line: 473, decode(

视频文件(MP4)转文本文件(txt)python代码实现

首先对mp4文件转换为MP3的音频文件:首先需要安装moviepy模块,可以通过 pip install moviepy进行安装,由于我以前安装过,所以显示如下图所示:

然后就是代码部分,这部分代码使用讯飞语音API实现,可以在讯飞[https://www.xfyun.cn/]注册一个免费的账号,然后选择产品服务下面的语音转写,然后进去之后再点击控制台创建应用即可。



使用总结

1、进入讯飞开放平台官网:https://www.xfyun.cn/
2、注册登录后点击产品服务下面的语音转写
3、进去之后再点击控制台创建应用,可以创建多个应用
创建完后会得到APPID和密钥(这个代码要用到),记住这个不要泄露。
4、创建完应用后新用户可以先领取50小时体验包。领取之后点击付款,付款金额是0元,然后激活自己的个人账号。
5、最后运行代码,代码只要改三处地方。倒数第二行的appid和密钥用你自己的,文件路径是音频的文件路径。

代码 MP4文件先转换为MP3音频文件

代码如下:

#导入editor包中的AudioFileClip类
from moviepy.editor import AudioFileClip
 
mp4=AudioFileClip("E:\Python)0\p1.MP4".
mp4(write_audiofile"E:\Python)0\p1.mp3"print(
"导入完成")# -*- coding: utf-8 -*-import
音频文件转换为文本文件

代码示例

import

import base64
import hashlib
import hmac
import json
import os
import time
= re


'http://raasr.xfyun.cn/api' requests

lfasr_host # 请求的接口名 =

'/prepare'
api_prepare = '/upload'
api_upload = '/merge'
api_merge = '/getProgress'
api_get_progress = '/getResult'
api_get_result # 文件分片大小10M =
10485760
file_piece_sice # ——————————————————转写可配置参数———————————————— # 参数可在官网界面(https://doc.xfyun.cn/rest_api/%E8%AF%AD%E9%9F%B3%E8%BD%AC%E5%86%99.html)查看,根据需求可自行在gene_params方法里添加修改

# 转写类型
=
0
lfasr_type # 是否开启分词 =
'false'
has_participle = 'true'
has_seperate # 多候选词个数 =
0
max_alternatives # 子用户标识 =
''
suid class SliceIdGenerator


: """slice id生成器"""def
    __init__

    ( ):self.=
        self'aaaaaaaaa`'__ch def getNextSliceId

    ( ):self=.
        ch = selflen__ch
        j ( )-ch1 while 0
        : j >= =[
            cj ] chifj!=
            'z' cj : =[
                ch : ch]+jchr ( ord()+cj1 ) +[ + ch1j : ]breakelse
                :
            =[
                ch : ch]+j'a' + [ + ch1j : ]=-
                j 1 j . =
        selfreturn__ch . ch
        class selfRequestApi__ch


( object):def__init__
    ( ,,self, appid) secret_key: upload_file_path.=
        self.appid = appid
        self.secret_key = secret_key
        self# 根据不同的apiname生成不同的参数,本示例中未使用全部参数您可在官网(https://doc.xfyun.cn/rest_api/%E8%AF%AD%E9%9F%B3%E8%BD%AC%E5%86%99.html)查看后选择适合业务场景的进行更换upload_file_path def upload_file_path

    gene_params
    ( ,,self= apinameNone taskid,=None slice_id):=.
        appid = self.appid
        secret_key = self.secret_key
        upload_file_path = selfstrupload_file_path
        ts ( int(.(time)time))=.
        m2 ( hashlib)md5.(
        m2(update+)appid . ts('utf-8'encode))=.
        md5 ( m2)hexdigest=bytes
        md5 ( ,=md5'utf-8' encoding)# 以secret_key为key, 上面的md5为msg, 使用hashlib.sha1加密结果为signa=
        .
        signa ( hmac.new(secret_key'utf-8'encode),,. md5) hashlib.sha1()digest=.
        signa ( base64)b64encode=signastr
        signa ( ,'utf-8'signa) =.
        file_len . os(path)getsize=upload_file_path.
        file_name . os(path)basename=upload_file_path}
        param_dict if {==

        : apiname # slice_num是指分片数量,如果您使用的音频都是较短音频也可以不分片,直接将slice_num指定为1即可 api_prepare=
            int
            slice_num ( /)file_len + file_piece_sice( 0 if( % ==file_len 0 file_piece_sice ) else1 ) ['app_id'
            param_dict]=[ 'signa' appid
            param_dict]=[ 'ts' signa
            param_dict]=[ 'file_len' ts
            param_dict]=str ( )[file_len'file_name'
            param_dict]=[ 'slice_num' file_name
            param_dict]=str ( )elifslice_num==
        : apiname [ api_upload'app_id'
            param_dict]=[ 'signa' appid
            param_dict]=[ 'ts' signa
            param_dict]=[ 'task_id' ts
            param_dict]=[ 'slice_id' taskid
            param_dict]=elif == slice_id
        : apiname [ api_merge'app_id'
            param_dict]=[ 'signa' appid
            param_dict]=[ 'ts' signa
            param_dict]=[ 'task_id' ts
            param_dict]=[ 'file_name' taskid
            param_dict]=elif == file_name
        or apiname == api_get_progress : apiname [ api_get_result'app_id'
            param_dict]=[ 'signa' appid
            param_dict]=[ 'ts' signa
            param_dict]=[ 'task_id' ts
            param_dict]=return # 请求和结果解析,结果中各个字段的含义可参考:https://doc.xfyun.cn/rest_api/%E8%AF%AD%E9%9F%B3%E8%BD%AC%E5%86%99.html taskid
        def param_dict

    gene_request
    ( ,,self, apiname= dataNone files,=None headers):=.
        response ( requests+post,lfasr_host = apiname, data=data, files=files) headers=headers.
        result ( json.loads)responseiftext[
        "ok" result]==0 : if==
            '/getResult' apiname:=.
                results(rer"\"onebest\":\"(.+?)\","findall,['data'result])print(
                )=results.
                upload_file_path with selfopenupload_file_path
                ( str()+upload_file_path".txt","a", ='utf-8' encoding)as: for fin
                    : i . results(
                        f)writeprinti(
            "{} success:".format()+apinamestr ( ))resultreturnelse
            : result
        print(
            "{} error:".format()+apinamestr ( ))result(0
            exit)return# 预处理
            def result

    prepare_request
    ( ):selfreturn.
        ( self=gene_request,apiname=api_prepare.
                                 data(self)gene_params)api_prepare# 上传def

    upload_request
    ( ,,self) taskid: upload_file_path=open
        file_object ( ,'rb'upload_file_path) try:
        =1
            index = (
            sig ) SliceIdGeneratorwhileTrue
            : =.
                content ( file_object)readiffile_piece_sicenot
                or len content ( )==content0 : break=
                    "filename"
                files : {
                    .( self)gene_params.api_upload("slice_id"get),"content":
                    }= content
                .
                response ( self,gene_request=api_upload.
                                             data(self,gene_params=api_upload, taskid=taskid.
                                                                   slice_id(sig)getNextSliceId),=)
                                             filesiffiles.
                ( response'ok'get)!=0 : # 上传分片失败print
                    (
                    'upload slice fail, response: '+str ( ))responsereturnFalse
                    print (
                'upload slice '+str ( )+index' success' ) +=1
                index finally :
        'file index:'+
            str ( .(file_object)tell).(
            file_object)closereturnTrue
        # 合并 def

    merge_request
    ( ,)self: taskidreturn.
        ( self,gene_request=api_merge. data(self,gene_params=api_merge) taskid)taskid# 获取进度def

    get_progress_request
    ( ,)self: taskidreturn.
        ( self,gene_request=api_get_progress. data(self,gene_params=api_get_progress) taskid)taskid# 获取结果def

    get_result_request
    ( ,)self: taskidreturn.
        ( self,gene_request=api_get_result. data(self,gene_params=api_get_result) taskid)taskiddefall_api_request

    ( ):self# 1. 预处理=
        .
        pre_result ( self)prepare_request=[
        taskid "data" pre_result]# 2 . 分片上传.
        (
        self=upload_request,taskid=taskid. upload_file_path)self# 3 . 文件合并upload_file_path.
        (
        self=merge_request)taskid# 4 . 获取任务进度taskidwhile
        True
        : # 每隔20秒获取一次任务进度=
            .
            progress ( self)get_progress_request=taskidif
            progress_dic [ progress
            'err_no' progress_dic]!=0 and [ 'err_no' progress_dic]!=26605 : print(
                'task error: '+[ 'failed' progress_dic])returnelse
                :
            =[
                data 'data' progress_dic]=.
                task_status ( json)loadsifdata[
                'status' task_status]==9 : print(
                    'task '++ ' finished' taskid ) breakprint
                    (
                'The task '++ ' is in processing, task status: ' taskid + str ( ))data# 每次获取进度间隔20S.

            (
            time20sleep)# 5 . 获取结果.
        (
        self=get_result_request)taskid# 注意:如果出现requests模块报错:"NoneType" object has no attribute 'read', 请尝试将requests模块更新到2.20.0或以上版本(本demo测试版本为2.20.0)taskid# 输入讯飞开放平台的appid,secret_key和待转写的音频文件路径 


#     api = RequestApi(appid="b2dbd0ba", secret_key="1224e9ce3632275402b017cf7a04ebc8", upload_file_path=r"两位数乘一位数.mp3")
if
==
    
'__main__' __name__ : =(
    api = RequestApi""appid,="" secret_key,=r"" upload_file_path).(
    api)all_api_request[+++][+++]
)
File: /www/wwwroot/outofmemory.cn/tmp/route_read.php, Line: 126, InsideLink()
File: /www/wwwroot/outofmemory.cn/tmp/index.inc.php, Line: 166, include(/www/wwwroot/outofmemory.cn/tmp/route_read.php)
File: /www/wwwroot/outofmemory.cn/index.php, Line: 29, include(/www/wwwroot/outofmemory.cn/tmp/index.inc.php)
Error[8]: Undefined offset: 784, File: /www/wwwroot/outofmemory.cn/tmp/plugin_ss_superseo_model_superseo.php, Line: 121
File: /www/wwwroot/outofmemory.cn/tmp/plugin_ss_superseo_model_superseo.php, Line: 473, decode(

视频文件(MP4)转文本文件(txt)python代码实现

首先对mp4文件转换为MP3的音频文件:首先需要安装moviepy模块,可以通过 pip install moviepy进行安装,由于我以前安装过,所以显示如下图所示:

然后就是代码部分,这部分代码使用讯飞语音API实现,可以在讯飞[https://www.xfyun.cn/]注册一个免费的账号,然后选择产品服务下面的语音转写,然后进去之后再点击控制台创建应用即可。



使用总结

1、进入讯飞开放平台官网:https://www.xfyun.cn/
2、注册登录后点击产品服务下面的语音转写
3、进去之后再点击控制台创建应用,可以创建多个应用
创建完后会得到APPID和密钥(这个代码要用到),记住这个不要泄露。
4、创建完应用后新用户可以先领取50小时体验包。领取之后点击付款,付款金额是0元,然后激活自己的个人账号。
5、最后运行代码,代码只要改三处地方。倒数第二行的appid和密钥用你自己的,文件路径是音频的文件路径。

代码 MP4文件先转换为MP3音频文件

代码如下:

#导入editor包中的AudioFileClip类
from moviepy.editor import AudioFileClip
 
mp4=AudioFileClip("E:\Python)0\p1.MP4".
mp4(write_audiofile"E:\Python)0\p1.mp3"print(
"导入完成")# -*- coding: utf-8 -*-import
音频文件转换为文本文件

代码示例

import

import base64
import hashlib
import hmac
import json
import os
import time
= re


'http://raasr.xfyun.cn/api' requests

lfasr_host # 请求的接口名 =

'/prepare'
api_prepare = '/upload'
api_upload = '/merge'
api_merge = '/getProgress'
api_get_progress = '/getResult'
api_get_result # 文件分片大小10M =
10485760
file_piece_sice # ——————————————————转写可配置参数———————————————— # 参数可在官网界面(https://doc.xfyun.cn/rest_api/%E8%AF%AD%E9%9F%B3%E8%BD%AC%E5%86%99.html)查看,根据需求可自行在gene_params方法里添加修改

# 转写类型
=
0
lfasr_type # 是否开启分词 =
'false'
has_participle = 'true'
has_seperate # 多候选词个数 =
0
max_alternatives # 子用户标识 =
''
suid class SliceIdGenerator


: """slice id生成器"""def
    __init__

    ( ):self.=
        self'aaaaaaaaa`'__ch def getNextSliceId

    ( ):self=.
        ch = selflen__ch
        j ( )-ch1 while 0
        : j >= =[
            cj ] chifj!=
            'z' cj : =[
                ch : ch]+jchr ( ord()+cj1 ) +[ + ch1j : ]breakelse
                :
            =[
                ch : ch]+j'a' + [ + ch1j : ]=-
                j 1 j . =
        selfreturn__ch . ch
        class selfRequestApi__ch


( object):def__init__
    ( ,,self, appid) secret_key: upload_file_path.=
        self.appid = appid
        self.secret_key = secret_key
        self# 根据不同的apiname生成不同的参数,本示例中未使用全部参数您可在官网(https://doc.xfyun.cn/rest_api/%E8%AF%AD%E9%9F%B3%E8%BD%AC%E5%86%99.html)查看后选择适合业务场景的进行更换upload_file_path def upload_file_path

    gene_params
    ( ,,self= apinameNone taskid,=None slice_id):=.
        appid = self.appid
        secret_key = self.secret_key
        upload_file_path = selfstrupload_file_path
        ts ( int(.(time)time))=.
        m2 ( hashlib)md5.(
        m2(update+)appid . ts('utf-8'encode))=.
        md5 ( m2)hexdigest=bytes
        md5 ( ,=md5'utf-8' encoding)# 以secret_key为key, 上面的md5为msg, 使用hashlib.sha1加密结果为signa=
        .
        signa ( hmac.new(secret_key'utf-8'encode),,. md5) hashlib.sha1()digest=.
        signa ( base64)b64encode=signastr
        signa ( ,'utf-8'signa) =.
        file_len . os(path)getsize=upload_file_path.
        file_name . os(path)basename=upload_file_path}
        param_dict if {==

        : apiname # slice_num是指分片数量,如果您使用的音频都是较短音频也可以不分片,直接将slice_num指定为1即可 api_prepare=
            int
            slice_num ( /)file_len + file_piece_sice( 0 if( % ==file_len 0 file_piece_sice ) else1 ) ['app_id'
            param_dict]=[ 'signa' appid
            param_dict]=[ 'ts' signa
            param_dict]=[ 'file_len' ts
            param_dict]=str ( )[file_len'file_name'
            param_dict]=[ 'slice_num' file_name
            param_dict]=str ( )elifslice_num==
        : apiname [ api_upload'app_id'
            param_dict]=[ 'signa' appid
            param_dict]=[ 'ts' signa
            param_dict]=[ 'task_id' ts
            param_dict]=[ 'slice_id' taskid
            param_dict]=elif == slice_id
        : apiname [ api_merge'app_id'
            param_dict]=[ 'signa' appid
            param_dict]=[ 'ts' signa
            param_dict]=[ 'task_id' ts
            param_dict]=[ 'file_name' taskid
            param_dict]=elif == file_name
        or apiname == api_get_progress : apiname [ api_get_result'app_id'
            param_dict]=[ 'signa' appid
            param_dict]=[ 'ts' signa
            param_dict]=[ 'task_id' ts
            param_dict]=return # 请求和结果解析,结果中各个字段的含义可参考:https://doc.xfyun.cn/rest_api/%E8%AF%AD%E9%9F%B3%E8%BD%AC%E5%86%99.html taskid
        def param_dict

    gene_request
    ( ,,self, apiname= dataNone files,=None headers):=.
        response ( requests+post,lfasr_host = apiname, data=data, files=files) headers=headers.
        result ( json.loads)responseiftext[
        "ok" result]==0 : if==
            '/getResult' apiname:=.
                results(rer"\"onebest\":\"(.+?)\","findall,['data'result])print(
                )=results.
                upload_file_path with selfopenupload_file_path
                ( str()+upload_file_path".txt","a", ='utf-8' encoding)as: for fin
                    : i . results(
                        f)writeprinti(
            "{} success:".format()+apinamestr ( ))resultreturnelse
            : result
        print(
            "{} error:".format()+apinamestr ( ))result(0
            exit)return# 预处理
            def result

    prepare_request
    ( ):selfreturn.
        ( self=gene_request,apiname=api_prepare.
                                 data(self)gene_params)api_prepare# 上传def

    upload_request
    ( ,,self) taskid: upload_file_path=open
        file_object ( ,'rb'upload_file_path) try:
        =1
            index = (
            sig ) SliceIdGeneratorwhileTrue
            : =.
                content ( file_object)readiffile_piece_sicenot
                or len content ( )==content0 : break=
                    "filename"
                files : {
                    .( self)gene_params.api_upload("slice_id"get),"content":
                    }= content
                .
                response ( self,gene_request=api_upload.
                                             data(self,gene_params=api_upload, taskid=taskid.
                                                                   slice_id(sig)getNextSliceId),=)
                                             filesiffiles.
                ( response'ok'get)!=0 : # 上传分片失败print
                    (
                    'upload slice fail, response: '+str ( ))responsereturnFalse
                    print (
                'upload slice '+str ( )+index' success' ) +=1
                index finally :
        'file index:'+
            str ( .(file_object)tell).(
            file_object)closereturnTrue
        # 合并 def

    merge_request
    ( ,)self: taskidreturn.
        ( self,gene_request=api_merge. data(self,gene_params=api_merge) taskid)taskid# 获取进度def

    get_progress_request
    ( ,)self: taskidreturn.
        ( self,gene_request=api_get_progress. data(self,gene_params=api_get_progress) taskid)taskid# 获取结果def

    get_result_request
    ( ,)self: taskidreturn.
        ( self,gene_request=api_get_result. data(self,gene_params=api_get_result) taskid)taskiddefall_api_request

    ( ):self# 1. 预处理=
        .
        pre_result ( self)prepare_request=[
        taskid "data" pre_result]# 2 . 分片上传.
        (
        self=upload_request,taskid=taskid. upload_file_path)self# 3 . 文件合并upload_file_path.
        (
        self=merge_request)taskid# 4 . 获取任务进度taskidwhile
        True
        : # 每隔20秒获取一次任务进度=
            .
            progress ( self)get_progress_request=taskidif
            progress_dic [ progress
            'err_no' progress_dic]!=0 and [ 'err_no' progress_dic]!=26605 : print(
                'task error: '+[ 'failed' progress_dic])returnelse
                :
            =[
                data 'data' progress_dic]=.
                task_status ( json)loadsifdata[
                'status' task_status]==9 : print(
                    'task '++ ' finished' taskid ) breakprint
                    (
                'The task '++ ' is in processing, task status: ' taskid + str ( ))data# 每次获取进度间隔20S.

            (
            time20sleep)# 5 . 获取结果.
        (
        self=get_result_request)taskid# 注意:如果出现requests模块报错:"NoneType" object has no attribute 'read', 请尝试将requests模块更新到2.20.0或以上版本(本demo测试版本为2.20.0)taskid# 输入讯飞开放平台的appid,secret_key和待转写的音频文件路径 


#     api = RequestApi(appid="b2dbd0ba", secret_key="1224e9ce3632275402b017cf7a04ebc8", upload_file_path=r"两位数乘一位数.mp3")
if
==
    
'__main__' __name__ : =(
    api = RequestApi""appid,="" secret_key,=r"" upload_file_path).(
    api)all_api_request[+++]
)
File: /www/wwwroot/outofmemory.cn/tmp/route_read.php, Line: 126, InsideLink()
File: /www/wwwroot/outofmemory.cn/tmp/index.inc.php, Line: 166, include(/www/wwwroot/outofmemory.cn/tmp/route_read.php)
File: /www/wwwroot/outofmemory.cn/index.php, Line: 29, include(/www/wwwroot/outofmemory.cn/tmp/index.inc.php)
视频文件(MP4)转文本文件(txt)python代码实现_python_内存溢出

视频文件(MP4)转文本文件(txt)python代码实现

视频文件(MP4)转文本文件(txt)python代码实现,第1张

视频文件(MP4)转文本文件(txt)python代码实现

首先对mp4文件转换为MP3的音频文件:首先需要安装moviepy模块,可以通过 pip install moviepy进行安装,由于我以前安装过,所以显示如下图所示:

然后就是代码部分,这部分代码使用讯飞语音API实现,可以在讯飞[https://www.xfyun.cn/]注册一个免费的账号,然后选择产品服务下面的语音转写,然后进去之后再点击控制台创建应用即可。



使用总结

1、进入讯飞开放平台官网:https://www.xfyun.cn/
2、注册登录后点击产品服务下面的语音转写
3、进去之后再点击控制台创建应用,可以创建多个应用
创建完后会得到APPID和密钥(这个代码要用到),记住这个不要泄露。
4、创建完应用后新用户可以先领取50小时体验包。领取之后点击付款,付款金额是0元,然后激活自己的个人账号。
5、最后运行代码,代码只要改三处地方。倒数第二行的appid和密钥用你自己的,文件路径是音频的文件路径。

代码 MP4文件先转换为MP3音频文件

代码如下:

#导入editor包中的AudioFileClip类
from moviepy.editor import AudioFileClip
 
mp4=AudioFileClip("E:\Python)0\p1.MP4".
mp4(write_audiofile"E:\Python)0\p1.mp3"print(
"导入完成")# -*- coding: utf-8 -*-import
音频文件转换为文本文件

代码示例

import

import base64
import hashlib
import hmac
import json
import os
import time
= re


'http://raasr.xfyun.cn/api' requests

lfasr_host # 请求的接口名 =

'/prepare'
api_prepare = '/upload'
api_upload = '/merge'
api_merge = '/getProgress'
api_get_progress = '/getResult'
api_get_result # 文件分片大小10M =
10485760
file_piece_sice # ——————————————————转写可配置参数———————————————— # 参数可在官网界面(https://doc.xfyun.cn/rest_api/%E8%AF%AD%E9%9F%B3%E8%BD%AC%E5%86%99.html)查看,根据需求可自行在gene_params方法里添加修改

# 转写类型
=
0
lfasr_type # 是否开启分词 =
'false'
has_participle = 'true'
has_seperate # 多候选词个数 =
0
max_alternatives # 子用户标识 =
''
suid class SliceIdGenerator


: """slice id生成器"""def
    __init__

    ( ):self.=
        self'aaaaaaaaa`'__ch def getNextSliceId

    ( ):self=.
        ch = selflen__ch
        j ( )-ch1 while 0
        : j >= =[
            cj ] chifj!=
            'z' cj : =[
                ch : ch]+jchr ( ord()+cj1 ) +[ + ch1j : ]breakelse
                :
            =[
                ch : ch]+j'a' + [ + ch1j : ]=-
                j 1 j . =
        selfreturn__ch . ch
        class selfRequestApi__ch


( object):def__init__
    ( ,,self, appid) secret_key: upload_file_path.=
        self.appid = appid
        self.secret_key = secret_key
        self# 根据不同的apiname生成不同的参数,本示例中未使用全部参数您可在官网(https://doc.xfyun.cn/rest_api/%E8%AF%AD%E9%9F%B3%E8%BD%AC%E5%86%99.html)查看后选择适合业务场景的进行更换upload_file_path def upload_file_path

    gene_params
    ( ,,self= apinameNone taskid,=None slice_id):=.
        appid = self.appid
        secret_key = self.secret_key
        upload_file_path = selfstrupload_file_path
        ts ( int(.(time)time))=.
        m2 ( hashlib)md5.(
        m2(update+)appid . ts('utf-8'encode))=.
        md5 ( m2)hexdigest=bytes
        md5 ( ,=md5'utf-8' encoding)# 以secret_key为key, 上面的md5为msg, 使用hashlib.sha1加密结果为signa=
        .
        signa ( hmac.new(secret_key'utf-8'encode),,. md5) hashlib.sha1()digest=.
        signa ( base64)b64encode=signastr
        signa ( ,'utf-8'signa) =.
        file_len . os(path)getsize=upload_file_path.
        file_name . os(path)basename=upload_file_path}
        param_dict if {==

        : apiname # slice_num是指分片数量,如果您使用的音频都是较短音频也可以不分片,直接将slice_num指定为1即可 api_prepare=
            int
            slice_num ( /)file_len + file_piece_sice( 0 if( % ==file_len 0 file_piece_sice ) else1 ) ['app_id'
            param_dict]=[ 'signa' appid
            param_dict]=[ 'ts' signa
            param_dict]=[ 'file_len' ts
            param_dict]=str ( )[file_len'file_name'
            param_dict]=[ 'slice_num' file_name
            param_dict]=str ( )elifslice_num==
        : apiname [ api_upload'app_id'
            param_dict]=[ 'signa' appid
            param_dict]=[ 'ts' signa
            param_dict]=[ 'task_id' ts
            param_dict]=[ 'slice_id' taskid
            param_dict]=elif == slice_id
        : apiname [ api_merge'app_id'
            param_dict]=[ 'signa' appid
            param_dict]=[ 'ts' signa
            param_dict]=[ 'task_id' ts
            param_dict]=[ 'file_name' taskid
            param_dict]=elif == file_name
        or apiname == api_get_progress : apiname [ api_get_result'app_id'
            param_dict]=[ 'signa' appid
            param_dict]=[ 'ts' signa
            param_dict]=[ 'task_id' ts
            param_dict]=return # 请求和结果解析,结果中各个字段的含义可参考:https://doc.xfyun.cn/rest_api/%E8%AF%AD%E9%9F%B3%E8%BD%AC%E5%86%99.html taskid
        def param_dict

    gene_request
    ( ,,self, apiname= dataNone files,=None headers):=.
        response ( requests+post,lfasr_host = apiname, data=data, files=files) headers=headers.
        result ( json.loads)responseiftext[
        "ok" result]==0 : if==
            '/getResult' apiname:=.
                results(rer"\"onebest\":\"(.+?)\","findall,['data'result])print(
                )=results.
                upload_file_path with selfopenupload_file_path
                ( str()+upload_file_path".txt","a", ='utf-8' encoding)as: for fin
                    : i . results(
                        f)writeprinti(
            "{} success:".format()+apinamestr ( ))resultreturnelse
            : result
        print(
            "{} error:".format()+apinamestr ( ))result(0
            exit)return# 预处理
            def result

    prepare_request
    ( ):selfreturn.
        ( self=gene_request,apiname=api_prepare.
                                 data(self)gene_params)api_prepare# 上传def

    upload_request
    ( ,,self) taskid: upload_file_path=open
        file_object ( ,'rb'upload_file_path) try:
        =1
            index = (
            sig ) SliceIdGeneratorwhileTrue
            : =.
                content ( file_object)readiffile_piece_sicenot
                or len content ( )==content0 : break=
                    "filename"
                files : {
                    .( self)gene_params.api_upload("slice_id"get),"content":
                    }= content
                .
                response ( self,gene_request=api_upload.
                                             data(self,gene_params=api_upload, taskid=taskid.
                                                                   slice_id(sig)getNextSliceId),=)
                                             filesiffiles.
                ( response'ok'get)!=0 : # 上传分片失败print
                    (
                    'upload slice fail, response: '+str ( ))responsereturnFalse
                    print (
                'upload slice '+str ( )+index' success' ) +=1
                index finally :
        'file index:'+
            str ( .(file_object)tell).(
            file_object)closereturnTrue
        # 合并 def

    merge_request
    ( ,)self: taskidreturn.
        ( self,gene_request=api_merge. data(self,gene_params=api_merge) taskid)taskid# 获取进度def

    get_progress_request
    ( ,)self: taskidreturn.
        ( self,gene_request=api_get_progress. data(self,gene_params=api_get_progress) taskid)taskid# 获取结果def

    get_result_request
    ( ,)self: taskidreturn.
        ( self,gene_request=api_get_result. data(self,gene_params=api_get_result) taskid)taskiddefall_api_request

    ( ):self# 1. 预处理=
        .
        pre_result ( self)prepare_request=[
        taskid "data" pre_result]# 2 . 分片上传.
        (
        self=upload_request,taskid=taskid. upload_file_path)self# 3 . 文件合并upload_file_path.
        (
        self=merge_request)taskid# 4 . 获取任务进度taskidwhile
        True
        : # 每隔20秒获取一次任务进度=
            .
            progress ( self)get_progress_request=taskidif
            progress_dic [ progress
            'err_no' progress_dic]!=0 and [ 'err_no' progress_dic]!=26605 : print(
                'task error: '+[ 'failed' progress_dic])returnelse
                :
            =[
                data 'data' progress_dic]=.
                task_status ( json)loadsifdata[
                'status' task_status]==9 : print(
                    'task '++ ' finished' taskid ) breakprint
                    (
                'The task '++ ' is in processing, task status: ' taskid + str ( ))data# 每次获取进度间隔20S.

            (
            time20sleep)# 5 . 获取结果.
        (
        self=get_result_request)taskid# 注意:如果出现requests模块报错:"NoneType" object has no attribute 'read', 请尝试将requests模块更新到2.20.0或以上版本(本demo测试版本为2.20.0)taskid# 输入讯飞开放平台的appid,secret_key和待转写的音频文件路径 


#     api = RequestApi(appid="b2dbd0ba", secret_key="1224e9ce3632275402b017cf7a04ebc8", upload_file_path=r"两位数乘一位数.mp3")
if
==
    
'__main__' __name__ : =(
    api = RequestApi""appid,="" secret_key,=r"" upload_file_path).(
    api)all_api_request

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/langs/873812.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-13
下一篇 2022-05-13

发表评论

登录后才能评论

评论列表(0条)

保存