首页 文章

Azure Data Lake Store - 远程主机强制关闭现有连接

提问于
浏览
1

我使用DataLakeStoreFileSystemManagementClient类从Data Lake Store读取文件 . 我们用这样的代码为文件打开一个蒸汽,逐字节读取并处理它 . 这是一个特殊情况,我们不能使用U-SQL进行数据处理 .

m_adlsFileSystemClient = new DataLakeStoreFileSystemManagementClient(…);
return m_adlsFileSystemClient.FileSystem.OpenAsync(m_connection.AccountName, path);

此过程可能需要60分钟才能读取和处理文件 . 问题是:我经常在流读取过程中收到“远程主机强行关闭现有连接 . ”异常 . 特别是当阅读需要20分钟或更长时间 . 它不应该是超时,因为我使用正确的客户端超时设置创建DataLakeStoreFileSystemManagementClient . 您可以在下面找到例外情况 . 该异常看起来是随机的,很难预测何时获得它 . 处理时间可以是第15分钟和第50分钟 .

从Data Lake Store读取文件是否正常?对Data Bo Store文件保持开放流的总时间是否有任何限制(或建议)?

例外:

System.AggregateException: One or more errors occurred. -
--> System.IO.IOException: Unable to read data from the transport connection: An
existing connection was forcibly closed by the remote host. ---> System.Net.Soc
kets.SocketException: An existing connection was forcibly closed by the remote h
ost
   at System.Net.Sockets.Socket.Receive(Byte[] buffer, Int32 offset, Int32 size,
SocketFlags socketFlags)
   at System.Net.Sockets.NetworkStream.Read(Byte[] buffer, Int32 offset, Int32 s
ize)
   --- End of inner exception stack trace ---
   at System.Net.ConnectStream.Read(Byte[] buffer, Int32 offset, Int32 size)
   at System.Net.Http.HttpClientHandler.WebExceptionWrapperStream.Read(Byte[] bu
ffer, Int32 offset, Int32 count)
   at System.Net.Http.DelegatingStream.Read(Byte[] buffer, Int32 offset, Int32 c
ount)
   at DataLake.Timeout.Research.FileDownloader.CopyStream(Stream input, Stream o
utput) in C:\TFS-SED\Main\Platform\DataNode\DataLake\DataLake.Timeout.Research\F
ileDownloader.cs:line 107
   at DataLake.Timeout.Research.FileDownloader.<DownloadFileAsync>d__6.MoveNext(
) in C:\TFS-SED\Main\Platform\DataNode\DataLake\DataLake.Timeout.Research\FileDo
wnloader.cs:line 96

2 回答

  • 0

    为避免这些类型的问题,建议采用以下方法:

    • 阅读较小的,可重复的块 . 根据我的经验,我发现4MB块最好,并提供最佳性能 . 此外,通过以较小的增量读取,您可以合并重试逻辑,以便在发生故障时从相同的偏移量重试 .

    如果您不知道流的大小(例如,在读取时它被另一个工作程序附加),则可以在“BadOffsetException”的有效负载中检查带有RemoteException的400错误 . 这将表明您已经开始超出文件末尾的偏移量 .

    const int MAX_BYTES_TO_READ = 4 * 1024 * 1024; //4MB
    …
    long offset = 0;
    while(notDone)
    {
    try
    {
                    var myStream = client.Read(accountName, offset, MAX_BYTES_TO_READ)
                    // do stuff with stream
    }
    catch(WebException ex)
    {
                    // read the web exception response
                    If (response.contains(“BadOffsetException”))
                                    noteDone = false;
    }
    }
    
  • 3

    谢谢阿米特,你的建议终于帮助了我 . 那是我的版本 . 该示例正在使用重试逻辑读取一批字节 . 我通过BufferedSteam使用4MB缓冲区 . 因此客户端可以按对象读取流对象,但我们以4MB批量请求服务 .

    while (!m_endOfFile)
    {
        try
        {
            var inputStream = m_client.OpenReadFile(
                m_filePath,
                length: count, 
                offset: m_position);
    
                var memoryStream = new MemoryStream(count);
                inputStream.CopyTo(memoryStream);
                m_position += memoryStream.Length;
                result = memoryStream.ToArray();
                break;
        }
        catch (CloudException ex)
        {
            if (ex.Response.Content.ToString().Contains("Invalid offset value"))
            {
                m_endOfFile = true;
            }
            else
            {
                throw;
            }
            }
        catch (IOException)
        {
            repeats++;
            if (repeats >= RepeatCount)
            {
                throw;
            }
        }  
    }
    

相关问题