- Get link
- X
- Other Apps
In one of my projects I needed to have multiple instance of the same process running in parallel and each one of them if it fails for some reason needs to restart immediately. The idea was to have a message queue, in this case I'm using Azure, and multiple competing consumers (Each process is a consumer).
I had differences options:
1. Create multiple windows services that would be managed by the operating system, but the main down side is that whenever I need to scale by increasing the number of consumers I'll have to install a new windows service, likewise if I want to scale down I'd have to stop services.
2. Create a windows service that is in charge of starting and maintaining a certain number of processes running at any time depending on the a configuration file.
3. The third option I've considered is using Azure workers, but I quickly dismissed it when I found out how much it was costing me, since these consumers are running continuously, but if it was for a limited workload and limited amount of time this solution would have been a viable one.
I've decided to implement solution 2, and I had to develop a windows service that'll manage a list of process configured in the app.config:
ProcessRunnerConfigurationElement:
ProcessRunnerConfigurationElementCollection:
Below the details of the method GetProcesses, this method will create the appropriate number of processes with the right parameters and ProcessInfo:
I had differences options:
1. Create multiple windows services that would be managed by the operating system, but the main down side is that whenever I need to scale by increasing the number of consumers I'll have to install a new windows service, likewise if I want to scale down I'd have to stop services.
2. Create a windows service that is in charge of starting and maintaining a certain number of processes running at any time depending on the a configuration file.
3. The third option I've considered is using Azure workers, but I quickly dismissed it when I found out how much it was costing me, since these consumers are running continuously, but if it was for a limited workload and limited amount of time this solution would have been a viable one.
I've decided to implement solution 2, and I had to develop a windows service that'll manage a list of process configured in the app.config:
<processessection> <processes> <add arguments="DownloadPrices" nbinstances="2" path="myExe1.exe" templatename="Exe1_{0}"> <add arguments="ExtractProducts" nbinstances="2" path="myExe2.exe" templatename="Exe2_{0}"> </add></add></processes> </processessection>The classes below provide a way to define and read the configuration, as the example above, in C#:
- Processes Section:
public class ProcessRunnerConfigSection : ConfigurationSection { [ConfigurationProperty("Processes", IsDefaultCollection = false)] [ConfigurationCollection(typeof(ProcessRunnerConfigurationElementCollection), AddItemName = "add", ClearItemsName = "clear", RemoveItemName = "remove")] public ProcessRunnerConfigurationElementCollection Processes { get { return (ProcessRunnerConfigurationElementCollection)base["Processes"]; } } }
public class ProcessRunnerConfigurationElement : ConfigurationElement { [ConfigurationProperty("TemplateName", DefaultValue = "_{0}", IsRequired = true, IsKey = true)] public string TemplateName { get { return (string)this["TemplateName"]; } set { this["TemplateName"] = value; } } [ConfigurationProperty("Path", DefaultValue = "", IsRequired = true, IsKey = false)] public string Path { get { return (string)this["Path"]; } set { this["Path"] = value; } } [ConfigurationProperty("Arguments", DefaultValue = "", IsRequired = false, IsKey = false)] public string Arguments { get { return (string)this["Arguments"]; } set { this["Arguments"] = value; } } [ConfigurationProperty("NbInstances", DefaultValue = 1, IsRequired = false, IsKey = false)] public int NbInstances { get { return (int)this["NbInstances"]; } set { this["NbInstances"] = value; } } [ConfigurationProperty("RestartInterval", DefaultValue = 360, IsRequired = false, IsKey = false)] public int RestartInterval { get { return (int)this["RestartInterval"]; } set { this["RestartInterval"] = value; } } }
public class ProcessRunnerConfigurationElementCollection : ConfigurationElementCollection { public ProcessRunnerConfigurationElementCollection() { Console.WriteLine("ServiceCollection Constructor"); } public ProcessRunnerConfigurationElement this[int index] { get { return (ProcessRunnerConfigurationElement)BaseGet(index); } set { if (BaseGet(index) != null) { BaseRemoveAt(index); } BaseAdd(index, value); } } public void Add(ProcessRunnerConfigurationElement serviceConfig) { BaseAdd(serviceConfig); } public void Clear() { BaseClear(); } protected override ConfigurationElement CreateNewElement() { return new ProcessRunnerConfigurationElement(); } protected override object GetElementKey(ConfigurationElement element) { return ((ProcessRunnerConfigurationElement)element).TemplateName; } public void Remove(ProcessRunnerConfigurationElement serviceConfig) { BaseRemove(serviceConfig.TemplateName); } public void RemoveAt(int index) { BaseRemoveAt(index); } public void Remove(string name) { BaseRemove(name); } }No That we have the configuration ready, we can focus on the implementation of the windows service:
- The service needs to read the configuration
- Start the configured number of processes
- Re-start any process that stops
- Stop all children processes when the service stops
The new windows service code will look like the code below:
public partial class ProcessesRunner : ServiceBase { protected override void OnStart(string[] args) { // 1. Read the configuration ProcessRunnerConfigSection processRunnerConfigSection = ConfigurationManager.GetSection("ProcessesSection") as ProcessRunnerConfigSection; var processesConfigs = processRunnerConfigSection.Processes; foreach (ProcessRunnerConfigurationElement processConfig in processesConfigs) { // Create the processes from the configuation var processes = GetProcesses(processConfig.TemplateName, processConfig.Path, processConfig.Arguments, processConfig.NbInstances, processConfig.RestartInterval); foreach (var item in processes) { var p = item.Value; p.Exited += (object sender, System.EventArgs e) => { p.Start(); }; // Start the process p.Start(); } } } }
static IDictionaryIn this implementation we are covering from the 4 steps above only steps 1 to 3, which could be enough in most cases, in my cases I needed to have the processes stiped automatically when the windows service is stopped, therefor I had to make the following changes:GetProcesses(string templateName, string path, string arguments, int nbInstances, int restartInterval) { var result = new Dictionary (); for (int i = 0; i < nbInstances; i++) { var p = new Process { EnableRaisingEvents = true }; p.StartInfo = new ProcessStartInfo { FileName = path, Arguments = arguments, CreateNoWindow = true, ErrorDialog = false, WindowStyle = ProcessWindowStyle.Hidden }; var currentName = string.Format(templateName, i + 1); result.Add(currentName, p); } return result; }
- Add this new class that encapsulate some interop methods, which I took as is from the internet:
- Now we need to add three line of code in the OnStart method of the Windows Service:
public class Job : IDisposable { [DllImport("kernel32.dll", CharSet = CharSet.Unicode)] static extern IntPtr CreateJobObject(object a, string lpName); [DllImport("kernel32.dll")] static extern bool SetInformationJobObject(IntPtr hJob, JobObjectInfoType infoType, IntPtr lpJobObjectInfo, uint cbJobObjectInfoLength); [DllImport("kernel32.dll", SetLastError = true)] static extern bool AssignProcessToJobObject(IntPtr job, IntPtr process); [DllImport("kernel32.dll", SetLastError = true)] [ReliabilityContract(Consistency.WillNotCorruptState, Cer.Success)] [SuppressUnmanagedCodeSecurity] [return: MarshalAs(UnmanagedType.Bool)] static extern bool CloseHandle(IntPtr hObject); private IntPtr m_handle; private bool m_disposed = false; public Job() { m_handle = CreateJobObject(null, null); JOBOBJECT_BASIC_LIMIT_INFORMATION info = new JOBOBJECT_BASIC_LIMIT_INFORMATION(); info.LimitFlags = 0x2000; JOBOBJECT_EXTENDED_LIMIT_INFORMATION extendedInfo = new JOBOBJECT_EXTENDED_LIMIT_INFORMATION(); extendedInfo.BasicLimitInformation = info; int length = Marshal.SizeOf(typeof(JOBOBJECT_EXTENDED_LIMIT_INFORMATION)); IntPtr extendedInfoPtr = Marshal.AllocHGlobal(length); Marshal.StructureToPtr(extendedInfo, extendedInfoPtr, false); if (!SetInformationJobObject(m_handle, JobObjectInfoType.ExtendedLimitInformation, extendedInfoPtr, (uint)length)) throw new Exception(string.Format("Unable to set information. Error: {0}", Marshal.GetLastWin32Error())); } #region IDisposable Members public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } #endregion private void Dispose(bool disposing) { if (m_disposed) return; if (disposing) { } Close(); m_disposed = true; } public void Close() { CloseHandle(m_handle); m_handle = IntPtr.Zero; } public bool AddProcess(IntPtr handle) { return AssignProcessToJobObject(m_handle, handle); } }It also requires the following structs:
public enum JobObjectInfoType { AssociateCompletionPortInformation = 7, BasicLimitInformation = 2, BasicUIRestrictions = 4, EndOfJobTimeInformation = 6, ExtendedLimitInformation = 9, SecurityLimitInformation = 5, GroupInformation = 11 } [StructLayout(LayoutKind.Sequential)] public struct SECURITY_ATTRIBUTES { public int nLength; public IntPtr lpSecurityDescriptor; public int bInheritHandle; } [StructLayout(LayoutKind.Sequential)] struct JOBOBJECT_BASIC_LIMIT_INFORMATION { public Int64 PerProcessUserTimeLimit; public Int64 PerJobUserTimeLimit; public Int16 LimitFlags; public UInt32 MinimumWorkingSetSize; public UInt32 MaximumWorkingSetSize; public Int16 ActiveProcessLimit; public Int64 Affinity; public Int16 PriorityClass; public Int16 SchedulingClass; } [StructLayout(LayoutKind.Sequential)] struct IO_COUNTERS { public UInt64 ReadOperationCount; public UInt64 WriteOperationCount; public UInt64 OtherOperationCount; public UInt64 ReadTransferCount; public UInt64 WriteTransferCount; public UInt64 OtherTransferCount; } [StructLayout(LayoutKind.Sequential)] struct JOBOBJECT_EXTENDED_LIMIT_INFORMATION { public JOBOBJECT_BASIC_LIMIT_INFORMATION BasicLimitInformation; public IO_COUNTERS IoInfo; public UInt32 ProcessMemoryLimit; public UInt32 JobMemoryLimit; public UInt32 PeakProcessMemoryUsed; public UInt32 PeakJobMemoryUsed; }
protected override void OnStart(string[] args) { ProcessRunnerConfigSection processRunnerConfigSection = ConfigurationManager.GetSection("ProcessesSection") as ProcessRunnerConfigSection; //Create an instance of Job var job = new Job(); var processesConfigs = processRunnerConfigSection.Processes; foreach (ProcessRunnerConfigurationElement processConfig in processesConfigs) { var processes = GetProcesses(processConfig.TemplateName, processConfig.Path, processConfig.Arguments, processConfig.NbInstances, processConfig.RestartInterval); foreach (var item in processes) { var p = item.Value; p.Exited += (object sender, System.EventArgs e) => { p.Start(); // Add the process to Job job.AddProcess(p.Handle); }; p.Start(); // Add the process to Job job.AddProcess(p.Handle); } } }
Comments
Post a Comment