Перед разработчиком приложений Windows нередко встаёт вопрос, как обмениваться данными между двумя и более приложениями. Это бывает необходимо, например, если вы разрабатываете службу Windows и приложение для настройки и мониторинга за этой службой.
Для обмена данными между приложениями Windows существует много механизмов: буфер обмена (Clipboard), технология COM, передача данных между окнами процессов с помощью сообщения WM_COPYDATA, удаленный вызов процедур (RPC), сокеты (Windows Sockets), динамический обмен данными (DDE), именованные каналы (Named Pipes), почтовый ящик (Mailslot) и проецируемые в память файлы (Mapped file). У каждого способа есть свои плюсы и минусы. Здесь я не буду их рассматривать.
В данной статье, я хотел бы рассмотреть только один из перечисленных механизмов – именованные каналы (Named Pipes). Вообще существует два вида каналов: именованные и анонимные. Анонимные каналы обычно создаются родительским процессом и передаются дочернему процессу при его создании. У именованных каналов есть имя и любой процесс, знающий это имя, может подключиться к каналу. В любом случае, при работе с каналами у вас есть сервер (приложение создающее канал) и клиенты (приложения, подключающиеся к каналу).
Для работы с каналами в Windows есть системные функции, такие как CreateNamedPipe, TransactNamedPipe и другие. Но реализация Pipe-сервера и Pipe-клиента не лёгкая задача, поэтому я отправился на поиски готовых реализаций в Интернет.
Демонстрационные классы FWIOCompletionPipe
Первое, что я нашел решение FWIOCompletionPipes. Собственно это демонстрационная реализация сервера и клиента. В архиве вы найдёте юнит FWIOCompletionPipes.pas и примеры. В юните FWIOCompletionPipes.pas есть класс для создания сервера TFWPipeServer и класс клиента TFWPipeClient. Сразу скажу, что работу сервера я не проверял, а клиент пригодился, но об этом ниже.
Юнит Pipes
Отличная реализация Pipe-клиента и Pipe-сервера попалась мне на блоге Mick's Mix. Чтобы код заработал под Delphi XE3 без ошибок и предупреждений, пришлось кое-что поправить. Мой исправленный вариант вы можете скачать здесь:
Файлы:
Есть вариант юнита c поддержкой всех версий Delphi и платформы Win64:
В юните есть класс-сервер TPipeServer, класс-клиент TPipeClient и даже есть приятный бонус - класс для запуска и управления консольным приложением с перехватом потока вывода TPipeConsole. Юнит лучше всего подключить к новому bpl-проекту и инсталлировать в среде Delphi. Тогда компоненты будут доступны в окне инструментов Tool Palette.
Пользоваться компонентами одно удовольствие. Чтобы создать сервер, создайте экземпляр класса TPipeServer (или положите его на форму или модуль данных), задайте имя канала, установите свойство Active в true и обрабатывайте событие OnPipeMessage. Пример обработки полученного сообщения (здесь и ниже будем считать, что в качестве сообщения мы передаём и получаем XML-документ):
procedure TForm1.PipeServer1PipeMessage(Sender: TObject; Pipe: NativeUInt;
Stream: TStream);
var
msg: IXMLDocument;
begin
msg := TXMLDocument.Create(nil);
//Загружаем XML-документ из потока.
msg.LoadFromStream(Stream);
//Отображаем полученный XML-документ.
ShowMessage(msg.XML.Text);
end;
Теперь, чтобы отправить сообщение серверу нужно воспользоваться функцией SendStream:
procedure TForm2.Button1Click(Sender: TObject);
var
xml: IXMLDocument;
memoryStream: TMemoryStream;
begin
memoryStream := TMemoryStream.Create;
try
//Создаём XML-документ для отправки.
xml := TXMLDocument.Create(nil);
xml.LoadFromXML('<MyMessage />');
//Сохраняем XML-документ в поток.
xml.SaveToStream(memoryStream);
//Подключаемся к Pipe-серверу.
PipeClient1.Connect;
//Отправляем данные серверу.
PipeClient1.SendStream(memoryStream);
finally
memoryStream.Free;
end;
end;
Теперь разберёмся, как ответить клиенту на сообщение. Это можно сделать так:
procedure TForm1.PipeServer1PipeMessage(Sender: TObject; Pipe: NativeUInt;
Stream: TStream);
var
xml: IXMLDocument;
begin
xml := TXMLDocument.Create(nil);
//Загружаем XML-документ из потока.
xml.LoadFromStream(Stream);
//Отображаем полученный XML-документ.
ShowMessage(xml.XML.Text);
//Отправляем клиенту полученное сообщение обратно.
TPipeServer(Sender).SendStream(Pipe, Stream);
end;
Если нам нужно отправить сообщение всем клиентам (сделать рассылку), это можно сделать так:
procedure TForm1.Button1Click(Sender: TObject);
var
i: Integer;
xml: IXMLDocument;
memoryStream: TMemoryStream;
begin
memoryStream := TMemoryStream.Create;
try
//Создаём XML-документ для отправки.
xml := TXMLDocument.Create(nil);
xml.LoadFromXML('<MyMessage />');
//Сохраняем XML-документ в поток.
xml.SaveToStream(memoryStream);
//Рассылаем сообщение всем клиентам.
for i := 0 to PipeServer1.ClientCount - 1 do
PipeServer1.SendStream(PipeServer1.Clients[i], memoryStream);
finally
memoryStream.Free;
end;
end;
При всём удобстве компонентов есть одна загвоздка. Если с клиента вам нужно отправить сообщение серверу и сразу синхронно получить ответ, то такой режим работы не поддерживается. Немножко поискав, я нашел решение для консольного приложения:
program CmdClient;
{$APPTYPE CONSOLE}
uses
Windows, Messages, SysUtils, Pipes;
type
TPipeEventHandler = class(TObject)
public
procedure OnPipeSent(Sender: TObject; Pipe: HPIPE; Size: DWORD);
end;
procedure TPipeEventHandler.OnPipeSent(Sender: TObject; Pipe: HPIPE; Size: DWORD);
begin
WriteLn('On Pipe Sent has executed!');
end;
var
lpMsg: TMsg;
WideChars: Array [0..255] of WideChar;
myString: String;
iLength: Integer;
pcHandler: TPipeClient;
peHandler: TPipeEventHandler;
begin
// Create message queue for application
PeekMessage(lpMsg, 0, WM_USER, WM_USER, PM_NOREMOVE);
// Create client pipe handler
pcHandler:=TPipeClient.CreateUnowned;
// Resource protection
try
// Create event handler
peHandler:=TPipeEventHandler.Create;
// Resource protection
try
// Setup clien pipe
pcHandler.PipeName:='myNamedPipe';
pcHandler.ServerName:='.';
pcHandler.OnPipeSent:=peHandler.OnPipeSent;
// Resource protection
try
// Connect
if pcHandler.Connect(5000) then
begin
// Dispatch messages for pipe client
while PeekMessage(lpMsg, 0, 0, 0, PM_REMOVE) do DispatchMessage(lpMsg);
// Setup for send
myString:='the message I am sending';
iLength:=Length(myString) + 1;
StringToWideChar(myString, wideChars, iLength);
// Send pipe message
if pcHandler.Write(wideChars, iLength * 2) then
begin
// Flush the pipe buffers
pcHandler.FlushPipeBuffers;
// Get the message
if GetMessage(lpMsg, pcHandler.WindowHandle, 0, 0) then DispatchMessage(lpMsg);
end;
end
else
// Failed to connect
WriteLn('Failed to connect to ', pcHandler.PipeName);
finally
// Show complete
Write('Complete...');
// Delay
ReadLn;
end;
finally
// Disconnect event handler
pcHandler.OnPipeSent:=nil;
// Free event handler
peHandler.Free;
end;
finally
// Free pipe client
pcHandler.Free;
end;
end.
Но этот случай не подходит для оконного приложения. Здесь мне пришлось изобретать велосипед, чтобы получить ответ от сервера синхронно. Я использовал компонент TFWPipeClient, что описан выше, чтобы отправить сообщение и получить ответ. Но выяснилось, что компонент TPipeServer отправляет клиенту не просто данные, а так называемый пакет данных. Т.е. сначала он отправляет клиенту маркер начала пакета, затем сами данные (т.к. если объём данных большой, то данные разбиваются на куски) и, в конце, маркер конца пакета. Также мне понадобился хэндл канала открытого компонентом TFWPipeClient поэтому пришлось добавить свойство Pipe. Мой дополненный вариант юнита FWIOCompletionPipes.pas можете скачать здесь:
В итоге, синхронная отправка сообщения серверу и получение ответа выглядит так:
//Функция отправляет сообщение серверу и ждёт ответа, если параметр waitForAnswer выставлен в true.
function TForm2.SendMessageToServer(Message: IXMLDocument; waitForAnswer: boolean): IXMLDocument;
var
inputStream, outputStream: TMemoryStream;
fwPipeClient: TFWPipeClient;
buff: array [0..MAXWORD - 1] of Char;
lpBytesRead: DWORD;
//Функция проверки, являются ли полученные данные маркерами начала и конца пакета.
function IsPacketBound(buffer: Pointer; size: Int64; controlCode: Cardinal): boolean;
var
lpControlMsg : PPipeMsgBlock;
begin
result := false;
if size = SizeOf(TPipeMsgBlock) then
begin
lpControlMsg := PPipeMsgBlock(buffer);
if (lpControlMsg^.Size = SizeOf(TPipeMsgBlock))
and (lpControlMsg^.MagicStart = MB_MAGIC)
and (lpControlMsg^.MagicEnd = MB_MAGIC)
and (lpControlMsg^.ControlCode = controlCode) then
result := true;
end;
end;
begin
result := nil;
inputStream := TMemoryStream.Create;
try
Message.SaveToStream(inputStream);
if not waitForAnswer then
//Если не нужно ожидать ответа, то отправляем сообщение как обычно.
PipeClient1.SendStream(inputStream)
else
begin
//Создаём альтернативный Pipe-клиент.
fwPipeClient := TFWPipeClient.Create('.', PipeClient1.PipeName);
try
//Подключаем Pipe-клиент к серверу.
fwPipeClient.Active := true;
outputStream := TMemoryStream.Create;
try
//Отправляем данные серверу.
fwPipeClient.SendData(inputStream, outputStream);
//Если сервер вернул маркер начала пакета, то будем считывать весь пакет.
if IsPacketBound(outputStream.Memory, outputStream.Size, MB_START) then
begin
outputStream.Size := 0;
while True do
begin
if ReadFile(fwPipeClient.Pipe, buff[0], MAXWORD, lpBytesRead, nil) then
begin
if IsPacketBound(@buff[0], lpBytesRead, MB_END) then
break
else
outputStream.Write(buff, lpBytesRead);
end;
end;
end;
result := TXMLDocument.Create(nil);
result.LoadFromStream(outputStream);
finally
outputStream.Free;
end;
finally
fwPipeClient.Free;
end;
end;
finally
inputStream.Free;
end;
end;
А вот пример использования функции SendMessageToServer:
procedure TForm2.Button2Click(Sender: TObject);
var
msg, answer: IXMLDocument;
begin
//Создаём XML-документ для отправки.
msg := TXMLDocument.Create(nil);
msg.LoadFromXML('<MyMessage />');
//Отправляем сообщение и получаем ответ.
answer := SendMessageToServer(msg, true);
//Отображаем полученное от сервера сообщение.
ShowMessage(answer.XML.Text);
end;
Позже, при тестировании, выяснилось, что в классе TFWPipeClient неправильно реализовано подключение к каналу, поэтому при активной работе с каналом часто возникает ошибка System Error. Code: 231. Все копии канала заняты. Поэтому мне пришлось переписать функцию SendMessageToServer отказавшись от использования класса TFWPipeClient:
//Функция отправляет сообщение серверу и ждёт ответа, если параметр waitForAnswer выставлен в true.
function TForm2.SendMessageToServer(Message: IXMLDocument; waitForAnswer: boolean): IXMLDocument;
var
inputStream, outputStream: TMemoryStream;
buff: array [0..MAXWORD - 1] of Char;
lpBytesRead: DWORD;
lpNumberOfBytesWritten: DWORD;
lpMode: DWORD;
pipe: THandle;
//Функция проверки, являются ли полученные данные маркерами начала и конца пакета.
function IsPacketBound(buffer: Pointer; size: Int64; controlCode: Cardinal): boolean;
var
lpControlMsg : PPipeMsgBlock;
begin
result := false;
if size = SizeOf(TPipeMsgBlock) then
begin
lpControlMsg := PPipeMsgBlock(buffer);
if (lpControlMsg^.Size = SizeOf(TPipeMsgBlock))
and (lpControlMsg^.MagicStart = MB_MAGIC)
and (lpControlMsg^.MagicEnd = MB_MAGIC)
and (lpControlMsg^.ControlCode = controlCode) then
result := true;
end;
end;
begin
result := nil;
inputStream := TMemoryStream.Create;
try
Message.SaveToStream(inputStream);
if not waitForAnswer then
//Если не нужно ожидать ответа, то отправляем сообщение как обычно.
PipeClient1.SendStream(inputStream)
else
begin
pipe := INVALID_HANDLE_VALUE;
try
//Подключаемся к каналу.
while True do
begin
pipe := CreateFile(PChar('\\.\pipe\' + PipeClient1.PipeName), GENERIC_READ or GENERIC_WRITE, 0, nil, OPEN_EXISTING, 0, 0);
if pipe <> INVALID_HANDLE_VALUE then
break;
if GetLastError <> ERROR_PIPE_BUSY then
RaiseLastOSError;
Win32Check(WaitNamedPipe(PChar('\\.\pipe\' + PipeClient1.PipeName), NMPWAIT_WAIT_FOREVER));
end;
lpMode := PIPE_READMODE_MESSAGE;
Win32Check(SetNamedPipeHandleState(pipe, lpMode, nil, nil));
//Отправляем данные серверу.
Win32Check(WriteFile(pipe, inputStream.Memory^, inputStream.Size, lpNumberOfBytesWritten, nil));
outputStream := TMemoryStream.Create;
try
Win32Check(ReadFile(pipe, buff[0], MAXWORD, lpBytesRead, nil));
outputStream.Write(buff, lpBytesRead);
//Если сервер вернул маркер начала пакета, то будем считывать весь пакет.
if IsPacketBound(outputStream.Memory, outputStream.Size, MB_START) then
begin
outputStream.Size := 0;
while True do
begin
if ReadFile(pipe, buff[0], MAXWORD, lpBytesRead, nil) then
begin
if IsPacketBound(@buff[0], lpBytesRead, MB_END) then
break
else
outputStream.Write(buff, lpBytesRead);
end;
end;
end;
result := TXMLDocument.Create(nil);
result.LoadFromStream(outputStream);
finally
outputStream.Free;
end;
finally
if pipe <> INVALID_HANDLE_VALUE then
CloseHandle(pipe);
end;
end;
finally
inputStream.Free;
end;
end;
Добавлено 29.07.2016. Но функция, приведённая выше не совершенна, и если наш сервер упал и не отвечает, то программа в ней зависнет и будет до бесконечности ждать, пока сервер не пришлёт ответ. Это можно исправить, если ввести таймауты для каждой отдельной операции при работе с каналом (подключение к каналу, чтение из него и запись в него). В этом случае функция существенно усложнится. Вот что получится:
//Функция отправляет сообщение серверу и ждёт ответа указанное в параметре timeout время (в мс), если параметр waitForAnswer выставлен в true.
function TForm2.SendMessageToServer(Message: IXMLDocument;
waitForAnswer: boolean; timeout: DWORD): IXMLDocument;
var
inputStream, outputStream: TMemoryStream;
buff: array [0..MAXWORD - 1] of Char;
lpBytesRead: DWORD;
lpNumberOfBytesWritten: DWORD;
lpMode: DWORD;
pipe: THandle;
isPacket: boolean;
pendingRead: boolean;
lastError: integer;
olapRead: TOverlapped;
olapWrite: TOverlapped;
events: array [0 .. 0] of THandle;
function IsPacketBound(buffer: Pointer; size: Int64; controlCode: Cardinal): boolean;
var
lpControlMsg : PPipeMsgBlock;
begin
result := false;
if size = SizeOf(TPipeMsgBlock) then
begin
lpControlMsg := PPipeMsgBlock(buffer);
if (lpControlMsg^.Size = SizeOf(TPipeMsgBlock))
and (lpControlMsg^.MagicStart = MB_MAGIC)
and (lpControlMsg^.MagicEnd = MB_MAGIC)
and (lpControlMsg^.ControlCode = controlCode) then
result := true;
end;
end;
function AddPortion: boolean;
begin
result := false;
if (not isPacket) and IsPacketBound(@buff[0], lpBytesRead, MB_START) then
isPacket := true
else if (not isPacket) then
begin
outputStream.Write(buff, lpBytesRead);
result := true;
end
else if isPacket then
begin
if IsPacketBound(@buff[0], lpBytesRead, MB_END) then
begin
isPacket := false;
result := true;
end
else
outputStream.Write(buff, lpBytesRead);
end;
end;
procedure ReadMessage(timeout: integer; generateTimeoutError: boolean);
begin
while true do
begin
pendingRead := false;
if ReadFile(pipe, buff[0], MAXWORD, lpBytesRead, @olapRead) then
begin
if AddPortion then
exit;
ResetEvent(olapRead.hEvent);
end
else
begin
lastError := GetLastError;
if lastError = ERROR_IO_PENDING then
pendingRead := true
else if (lastError = ERROR_MORE_DATA) then
begin
if AddPortion then
exit;
end
else
RaiseLastOSError(lastError);
end;
if pendingRead then
begin
events[0] := olapRead.hEvent;
case WaitForMultipleObjects(1, @events, false, timeout) of
WAIT_OBJECT_0:
begin
if GetOverlappedResult(pipe, olapRead, lpBytesRead, true) then
begin
if AddPortion then
exit;
end
else
begin
lastError := GetLastError;
if lastError = ERROR_MORE_DATA then
begin
if AddPortion then
exit;
end
end;
end;
WAIT_TIMEOUT:
if generateTimeoutError then
raise Exception.Create('За отведённое время не получен ответ.')
else
break;
else
RaiseLastOSError;
end;
end;
end;
end;
begin
isPacket := false;
pipe := INVALID_HANDLE_VALUE;
pendingRead := false;
ClearOverlapped(olapRead, True);
ClearOverlapped(olapWrite, True);
result := nil;
inputStream := TMemoryStream.Create;
try
Message.SaveToStream(inputStream);
if not waitForAnswer then
PipeClient1.SendStream(inputStream)
else
begin
try
olapRead.hEvent := CreateEvent(nil, True, False, nil);
olapWrite.hEvent := CreateEvent(nil, True, False, nil);
try
outputStream := TMemoryStream.Create;
try
while True do
begin
pipe := CreateFile(PChar('\\.\pipe\' + PipeClient1.PipeName), GENERIC_READ or GENERIC_WRITE,
0, nil, OPEN_EXISTING, FILE_FLAG_OVERLAPPED, 0);
if pipe <> INVALID_HANDLE_VALUE then
break;
if GetLastError <> ERROR_PIPE_BUSY then
RaiseLastOSError;
Win32Check(WaitNamedPipe(PChar('\\.\pipe\' + PipeClient1.PipeName), timeout));
end;
lpMode := PIPE_READMODE_MESSAGE;
Win32Check(SetNamedPipeHandleState(pipe, lpMode, nil, nil));
while PeekNamedPipe(pipe, nil, 0, nil, @lpBytesRead, nil) and (lpBytesRead > 0) do
begin
ReadMessage(timeout, false);
outputStream.Size := 0;
end;
if WriteFile(pipe, inputStream.Memory^, inputStream.Size,
lpNumberOfBytesWritten, @olapWrite) then
begin
FlushFileBuffers(pipe);
ResetEvent(olapWrite.hEvent);
end
else
begin
lastError := GetLastError;
if lastError = ERROR_IO_PENDING then
begin
events[0] := olapWrite.hEvent;
case WaitForMultipleObjects(1, @events, false, timeout) of
WAIT_OBJECT_0:
begin
if GetOverlappedResult(pipe, olapWrite, lpNumberOfBytesWritten, True) then
FlushFileBuffers(pipe)
else
RaiseLastOSError;
end;
WAIT_TIMEOUT:
raise Exception.Create('За отведённое время не удалось отправить сообщение.');
else
RaiseLastOSError;
end;
end
else
RaiseLastOSError(lastError);
end;
ReadMessage(timeout, true);
result := TXMLDocument.Create(nil);
result.LoadFromStream(outputStream);
finally
outputStream.Free;
end;
finally
CloseHandle(olapRead.hEvent);
CloseHandle(olapWrite.hEvent);
end;
finally
if pipe <> INVALID_HANDLE_VALUE then
CloseHandle(pipe);
end;
end;
finally
inputStream.Free;
end;
end;
Подведём итог. Используя вышеописанное решение, вы можете быстро наладить взаимодействие между несколькими процессами на одном компьютере под управлением Windows. Вы сможете отправлять большие объёмы информации и реализовать синхронное и асинхронное взаимодействие.
Комментарии
Ещё поковырялся с этой проблемой. Оказалось, что клиент зависает из-за зависания сервера в функции TPipeThread.CompleteRead в строке
Result := GetOverlappedResult(FPipe, FOlapRead, FRcvRead, TRUE);
Попробуйте заменить код функции на следующий (у меня так работает):
function TPipeThread.CompleteRead : Boolean;
var
bContinue: boolean;
begin
// Reset the read event and pending flag
ResetEvent(FOlapRead.hEvent);
// Reset pending read
FPendingRead := FALSE;
// Check the overlapped results
//Result := GetOverlappedResult(FPipe, FOlapRead, FRcvRead, TRUE);
bContinue := true;
while bContinue do
begin
bContinue := false;
Result := GetOverlappedResult(FPipe, FOlapRead, FRcvRead, FALSE);
// Handle failure
if not(Result) then begin
// Get the last error code
FErrorCode := GetLastError;
if (FErrorCode = ERROR_IO_INCOMPLETE) then begin
// Operation is still pending, allow while loop
bContinue := true;
FErrorCode := 0;
end
else if (FErrorCode = ERROR_HANDLE_EOF) then begin
FErrorCode := 0;
Result := true;
end
// Check for more data
else if (FErrorCode = ERROR_MORE_DATA) then begin
// Write the current data to the stream
FRcvStream.Write(FRcvBuffer^, FRcvSize);
// Determine how much we need to expand the buffer to
Result := PeekNamedPipe(FPipe, nil, 0, nil, nil, @FRcvSize);
// Check result
if Result then begin
// Determine if required size is larger than allocated size
if (FRcvSize > FRcvAlloc) then begin
// Realloc buffer
ReallocMem(FRcvBuffer, FRcvSize);
// Update allocated size
FRcvAlloc := FRcvSize;
end;
// Set overlapped fields
ClearOverlapped(FOlapRead);
// Read from the file again
Result := ReadFile(FPipe, FRcvBuffer^, FRcvSize, FRcvRead,
@FOlapRead);
// Handle error
if not(Result) then begin
// Set error code
FErrorCode := GetLastError;
// Check for pending again, which means our state hasn't changed
if (FErrorCode = ERROR_IO_PENDING) then begin
// Still a pending read
FPendingRead := TRUE;
// Success
Result := TRUE;
end;
end;
end
else
// Set error code
FErrorCode := GetLastError;
end;
end;
end;
// Check result and pending read flag
if Result and not(FPendingRead) then begin
// We have the full message
FRcvStream.Write(FRcvBuffer^, FRcvRead);
// Call the OnData
DoMessage;
end;
end;
Как-то через раз работает. Если отправлять с клиента на сервер, то всё нормально, как только отсылаю с сервера на клиент - сообщение не доходит и после этого уже и с клиента не идёт.
У меня хорошо работает. Попробуйте мои тестовые проекты (в архиве исходники и exe-шники): yadi.sk/d/eZF1Iog536qCYv
И на клиенте и на сервере получаю уведомление об отправке, но сообщения не доходят и клиент виснет.
cloud.mail.ru/public/L2wM/dkiGFWanF
У меня всё работает поэтому мне сложно понять, что идёт не так. На какой строке кода виснет?
При пошаговом выполнении в отадчике сообщения доходят и всё работает, но стоит запустить без отладчика, как сообщения перестают доходить, всё обрывается на отправке заголовка, как и раньше
RSS лента комментариев этой записи