使用Python和Flask流数据

使用Python和Flask流数据,第1张

使用Python和Flask流数据

要替换页面上的现有内容,您可能需要Javascript,例如,可以发送它或使其为您发出请求,使用长时间轮询,websocket等。有许多方法可以使用,这是一种使用服务器发送事件的方法:

#!/usr/bin/env pythonimport itertoolsimport timefrom flask import Flask, Response, redirect, request, url_forapp = Flask(__name__)@app.route('/')def index():    if request.headers.get('accept') == 'text/event-stream':        def events(): for i, c in enumerate(itertools.cycle('|/-')):     yield "data: %s %dnn" % (c, i)     time.sleep(.1)  # an artificial delay        return Response(events(), content_type='text/event-stream')    return redirect(url_for('static', filename='index.html'))if __name__ == "__main__":    app.run(host='localhost', port=23423)

哪里

static/index.html

<!doctype html><title>Server Send Events Demo</title><style>  #data {    text-align: center;  }</style><script src="http://pre.jquery.com/jquery-latest.js"></script><script>if (!!window.EventSource) {  var source = new EventSource('/');  source.onmessage = function(e) {    $("#data").text(e.data);  }}</script><div id="data">nothing received yet</div>

如果连接断开,浏览器默认会在3秒内重新连接。如果没有其他要发送的内容,则服务器可以返回404或仅发送

'text/event-stream'
内容类型以外的其他内容来响应下一个请求。即使服务器上有更多数据,也要在客户端停止,可以致电
source.close()

注意:如果流不是无限的,则使用其他技术(而不是SSE),例如,发送javascript代码段替换文本(无限

<iframe>
技术):

#!/usr/bin/env pythonimport timefrom flask import Flask, Responseapp = Flask(__name__)@app.route('/')def index():    def g():        yield """<!doctype html><title>Send javascript snippets demo</title><style>  #data {    text-align: center;  }</style><script src="http://pre.jquery.com/jquery-latest.js"></script><div id="data">nothing received yet</div>"""        for i, c in enumerate("hello"): yield """<script>  $("#data").text("{i} {c}")</script>""".format(i=i, c=c) time.sleep(1)  # an artificial delay    return Response(g())if __name__ == "__main__":    app.run(host='localhost', port=23423)

我已在此处内联html以显示没有更多内容(没有魔术)。与上面相同,但使用模板:

#!/usr/bin/env pythonimport timefrom flask import Flask, Responseapp = Flask(__name__)def stream_template(template_name, **context):    # http://flask.pocoo.org/docs/patterns/streaming/#streaming-from-templates    app.update_template_context(context)    t = app.jinja_env.get_template(template_name)    rv = t.stream(context)    # uncomment if you don't need immediate reaction    ##rv.enable_buffering(5)    return [email protected]('/')def index():    def g():        for i, c in enumerate("hello"*10): time.sleep(.1)  # an artificial delay yield i, c    return Response(stream_template('index.html', data=g()))if __name__ == "__main__":    app.run(host='localhost', port=23423)

哪里

templates/index.html

<!doctype html><title>Send javascript with template demo</title><style>  #data {    text-align: center;  }</style><script src="http://pre.jquery.com/jquery-latest.js"></script><div id="data">nothing received yet</div>{% for i, c in data: %}<script>  $("#data").text("{{ i }} {{ c }}")</script>{% endfor %}


欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5666179.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存