目前,我们通过命令行参数运行处理.将来我们希望它作为主服务器的一部分运行,以便在执行处理时检查进度.
func run(using context: CommandContext) throws -> Future<VoID> { let table = "\"RecRegAction\"" let cursorname = "\"action_cursor\"" let chunkSize = 10_000 return context.container.withNewConnection(to: .psql) { connection in return PostgresqlDatabase.transactionExecute({ connection -> Future<Int> in return connection.simplequery("DECLARE \(cursorname) CURSOR FOR SELECT * FROM \(table)").map { result in var totalResults = 0 var finished : Bool = false while !finished { let results = try connection.raw("FETCH \(chunkSize) FROM \(cursorname)").all(deCoding: RecRegAction.self).wait() if results.count > 0 { totalResults += results.count print(totalResults) // ObvIoUsly we do our processing here } else { finished = true } } return totalResults } },on: connection) }.transform(to: ())}
现在这不起作用,因为我正在调用wait()并且我得到错误“Precondition Failed:在EventLoop上不能调用wait()”这是公平的.我面临的一个问题是,我不知道你是如何在主要的事件循环中运行这样在后台线程上运行这样的东西.我知道BlockingIOThreadPool,但似乎仍然在同一个EventLoop上运行并仍然导致错误.虽然我能够通过越来越复杂的方式来实现这一目标,但我希望我找不到一个优雅的解决方案,也许对SwiftNIO和Fluent有更好了解的人可以提供帮助.
编辑:要明确,这个目标显然不是要总计数据库中的 *** 作数.目标是使用游标同步处理每个 *** 作.当我读到结果时,我会检测到 *** 作中的更改,然后将它们的批次抛出到处理线程中.当所有线程都忙时,我不会再次从光标读取,直到它们完成.
这些行动很多,一次运行多达4500万.聚合承诺和递归似乎并不是一个好主意,当我尝试它时,只是为了它,服务器挂起.
这是一个处理密集型任务,可以在一个线程上运行数天,所以我不关心创建新线程.问题是我无法弄清楚如何在Command中使用wait()函数,因为我需要一个容器来创建数据库连接,而我唯一可以访问的是context.container对此调用wait()会导致以上错误.
TIA
解决方法 好的,如你所知,问题在于以下几点:while ... { ... try connection.raw("...").all(deCoding: RecRegAction.self).wait() ...}
你想等待一些结果,因此你使用while循环和.wait()来获得所有中间结果.本质上,这是在事件循环中将异步代码转换为同步代码.这可能会导致死锁,并且肯定会阻止其他连接,这就是为什么SwiftNIO会尝试检测到并导致错误的原因.我不会详细说明为什么它会阻止其他连接,或者为什么这可能会导致这个答案陷入僵局.
让我们看看我们有什么选择来解决这个问题:
>正如你所说,我们可以在另一个不是事件循环线程之一的线程上使用.wait().对于这个任何非EventLoop线程会这样做:dispatchQueue或你可以使用BlockingIOThreadPool(它不在EventLoop上运行)
>我们可以将您的代码重写为异步
两种解决方案都可以工作,但(1)实际上并不可取,因为您只需要等待结果就可以刻录整个(内核)线程.并且dispatch和BlockingIOThreadPool都有他们愿意产生的有限数量的线程,所以如果你经常这样做,你可能会耗尽线程,所以它需要更长的时间.
因此,让我们看看如何在累积中间结果的同时多次调用异步函数.然后,如果我们累积了所有中间结果,则继续所有结果.
为了简化 *** 作,让我们看看与您的功能非常相似的功能.我们假设这个函数就像你的代码一样提供
/// delivers partial results (integers) and `nil` if no further elements are availablefunc deliverPartialResult() -> EventLoopFuture<Int?> { ...}
我们现在想要的是一个新功能
func deliverFullResult() -> EventLoopFuture<[Int]>
请注意deliverPartialResult每次返回一个整数,deliverFullResult传递一个整数数组(即所有整数).好的,那么我们如何编写deliverFullResult而不调用deliverPartialResult().wait()?
那这个呢:
func accumulateResults(eventLoop: EventLoop,partialResultsSoFar: [Int],getPartial: @escaPing () -> EventLoopFuture<Int?>) -> EventLoopFuture<[Int]> { // let's run getPartial once return getPartial().then { partialResult in // we got a partial result,let's check what it is if let partialResult = partialResult { // another intermediate results,let's accumulate and call getPartial again return accumulateResults(eventLoop: eventLoop,partialResultsSoFar: partialResultsSoFar + [partialResult],getPartial: getPartial) } else { // we've got all the partial results,yay,let's fulfill the overall future return eventLoop.newSucceededFuture(result: partialResultsSoFar) } }}
鉴于accumulateResults,实现deliverFullResult不再太难了:
func deliverFullResult() -> EventLoopFuture<[Int]> { return accumulateResults(eventLoop: myCurrentEventLoop,partialResultsSoFar: [],getPartial: deliverPartialResult)}
但让我们更多地了解accumulateResults的作用:
>它调用getPartial一次,然后调用它
>检查我们是否有
>部分结果,在这种情况下,我们将记住它与其他partialResultsSoFar并返回(1)
> nil,这意味着partialResultsSoFar就是我们得到的所有东西,我们回归到了一个新的成功未来
那已经是真的了.我们在这里做的是将同步循环转换为异步递归.
好的,我们查看了很多代码,但这与你的功能现在有什么关系?
信不信由你,但这实际上应该有效(未经测试):
accumulateResults(eventLoop: el,partialResultsSoFar: []) { connection.raw("FETCH \(chunkSize) FROM \(cursorname)") .all(deCoding: RecRegAction.self) .map { results -> Int? in if results.count > 0 { return results.count } else { return nil } }}.map { allResults in return allResults.reduce(0,+)}
所有这些的结果将是EventLoopFuture< Int>它携带所有中间result.count的总和.
当然,我们首先将所有计数收集到一个数组中,然后在最后总结它(allResults.reduce(0,)),这有点浪费但也不是世界末日.我这样离开它是因为这使得accumulateResults可用于你想在数组中累积部分结果的其他情况.
现在最后一件事,一个真正的accumulateResults函数可能在元素类型上是通用的,我们也可以消除外部函数的partialResultsSoFar参数.那这个呢?
func accumulateResults<T>(eventLoop: EventLoop,getPartial: @escaPing () -> EventLoopFuture<T?>) -> EventLoopFuture<[T]> { // this is an inner function just to hIDe it from the outsIDe which carrIEs the accumulator func accumulateResults<T>(eventLoop: EventLoop,partialResultsSoFar: [T] /* our accumulator */,getPartial: @escaPing () -> EventLoopFuture<T?>) -> EventLoopFuture<[T]> { // let's run getPartial once return getPartial().then { partialResult in // we got a partial result,let's check what it is if let partialResult = partialResult { // another intermediate results,let's accumulate and call getPartial again return accumulateResults(eventLoop: eventLoop,getPartial: getPartial) } else { // we've got all the partial results,let's fulfill the overall future return eventLoop.newSucceededFuture(result: partialResultsSoFar) } } } return accumulateResults(eventLoop: eventLoop,getPartial: getPartial)}
编辑:编辑后,您的问题表明您实际上并不想积累中间结果.所以我的猜测是,您希望在收到每个中间结果后进行一些处理.如果这是你想要做的,也许试试这个:
func processpartialResults<T,V>(eventLoop: EventLoop,process: @escaPing (T) -> EventLoopFuture<V>,getPartial: @escaPing () -> EventLoopFuture<T?>) -> EventLoopFuture<V?> { func processpartialResults<T,soFar: V?,getPartial: @escaPing () -> EventLoopFuture<T?>) -> EventLoopFuture<V?> { // let's run getPartial once return getPartial().then { partialResult in // we got a partial result,let's call the process function and move on return process(partialResult).then { v in return processpartialResults(eventLoop: eventLoop,soFar: v,process: process,getPartial: getPartial) } } else { // we've got all the partial results,let's fulfill the overall future return eventLoop.newSucceededFuture(result: soFar) } } } return processpartialResults(eventLoop: eventLoop,soFar: nil,getPartial: getPartial)}
这将(如前所述)运行getPartial,直到它返回nil,但不是累积所有getPartial的结果,而是调用获得部分结果的进程,并且可以进行一些进一步的处理.当满足EventLoopFuture进程返回时,将发生下一个getPartial调用.
那更接近你想要的吗?
注意:我在这里使用了SwiftNIO的EventLoopFuture类型,在Vapor中您只需使用Future,但代码的其余部分应该相同.
总结以上是内存溢出为你收集整理的swift – 使用Vapor 3创建和使用游标全部内容,希望文章能够帮你解决swift – 使用Vapor 3创建和使用游标所遇到的程序开发问题。
如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)